"""Download audio from SharePoint Stream and upload to S3.""" import json import re import os import time import boto3 from urllib.parse import unquote, urlparse, parse_qs from xml.etree import ElementTree as ET from shared import S3_BUCKET, update_job MAX_RETRIES = 5 RETRY_BACKOFF = [1, 2, 4, 8, 16] # seconds s3 = boto3.client('s3') NS = { 'mpd': 'urn:mpeg:DASH:schema:MPD:2011', 'sea': 'urn:mpeg:dash:schema:sea:2012', } def handler(event, context): job_id = event['job_id'] update_job(job_id, status='DOWNLOADING') # Load input obj = s3.get_object(Bucket=S3_BUCKET, Key=f"jobs/{job_id}/input.json") inp = json.loads(obj['Body'].read()) curl_cmd = inp['curl'] cookies_str = inp.get('cookies', '') language = inp.get('language', 'auto') # Parse cURL manifest_url, headers = parse_curl(curl_cmd) # Setup session import requests session = requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) # Set cookies if cookies_str: sp_domain = extract_sp_domain(manifest_url) cookies_str = cookies_str.strip() # 支持 Cookie-Editor JSON 数组格式 if cookies_str.startswith('['): try: cookie_list = json.loads(cookies_str) for c in cookie_list: name = c.get('name', '') value = c.get('value', '') domain = c.get('domain', sp_domain) if domain.startswith('.'): domain = domain[1:] if name and value: session.cookies.set(name, value, domain=domain) except json.JSONDecodeError: pass else: # name=value; name=value 格式 for pair in cookies_str.split('; '): if '=' in pair: name, value = pair.split('=', 1) session.cookies.set(name.strip(), value.strip(), domain=sp_domain) # Fetch manifest svc_headers = {} for k, v in headers.items(): if k.lower() in ('x-spopactoken', 'origin', 'referer'): svc_headers[k] = v resp = fetch_with_retry(session, manifest_url, svc_headers) manifest_xml = resp.text # Parse manifest - find audio track root = ET.fromstring(manifest_xml) base_url_el = root.find('mpd:BaseURL', NS) base_url = base_url_el.text.strip() if base_url_el is not None else '' period = root.find('mpd:Period', NS) audio_track = None for adapt in period.findall('mpd:AdaptationSet', NS): if adapt.get('contentType') == 'audio': label_el = adapt.find('mpd:Label', NS) label = label_el.text if label_el is not None else '' if label == 'OriginalAudio' or audio_track is None: audio_track = adapt if label == 'OriginalAudio': break if audio_track is None: raise Exception("No audio track found in manifest") # Get encryption key cp = audio_track.find('mpd:ContentProtection', NS) enc_key = None enc_iv = None if cp is not None: crypto = cp.find('sea:CryptoPeriod', NS) if crypto is not None: key_url = crypto.get('keyUriTemplate', '').replace('&', '&') iv_str = crypto.get('IV', '') if key_url: kr = fetch_with_retry(session, key_url, svc_headers) enc_key = kr.content if iv_str.startswith('0x'): iv_str = iv_str[2:] enc_iv = bytes.fromhex(iv_str) if iv_str else None # Parse segments seg_tpl = audio_track.find('mpd:SegmentTemplate', NS) init_tpl = seg_tpl.get('initialization', '').replace('&', '&') media_tpl = seg_tpl.get('media', '').replace('&', '&') rep = audio_track.find('mpd:Representation', NS) rep_id = rep.get('id', '') segments = [] timeline = seg_tpl.find('mpd:SegmentTimeline', NS) t = 0 for s_el in timeline.findall('mpd:S', NS): d = int(s_el.get('d', 0)) r = int(s_el.get('r', 0)) for _ in range(r + 1): segments.append(t) t += d # Download init + segments sp_headers = {} for k, v in headers.items(): if k.lower() in ('origin', 'referer'): sp_headers[k] = v audio_data = bytearray() # Init segment init_url = resolve_url(base_url, init_tpl, rep_id) r = fetch_with_retry(session, init_url, sp_headers) audio_data.extend(decrypt(r.content, enc_key, enc_iv)) # Media segments total = len(segments) for idx, seg_time in enumerate(segments): url = resolve_url(base_url, media_tpl, rep_id, seg_time) r = fetch_with_retry(session, url, sp_headers) audio_data.extend(decrypt(r.content, enc_key, enc_iv)) if (idx + 1) % 50 == 0: print(f"[download] progress: {idx+1}/{total} segments") # Upload to S3 s3_key = f"jobs/{job_id}/audio.mp4" s3.put_object(Bucket=S3_BUCKET, Key=s3_key, Body=bytes(audio_data), ContentType='audio/mp4') update_job(job_id, status='DOWNLOADED', audio_s3_key=s3_key) return {**event, 'audio_s3_key': s3_key, 'language': language} def fetch_with_retry(session, url, headers): """GET with retry on 5xx / connection errors.""" import requests for attempt in range(MAX_RETRIES): try: r = session.get(url, headers=headers, timeout=60) if r.status_code < 500: r.raise_for_status() return r # 5xx: retry print(f"[download] HTTP {r.status_code}, retry {attempt+1}/{MAX_RETRIES}") except (requests.ConnectionError, requests.Timeout) as e: print(f"[download] {type(e).__name__}, retry {attempt+1}/{MAX_RETRIES}") if attempt < MAX_RETRIES - 1: time.sleep(RETRY_BACKOFF[attempt]) # Last attempt — let it raise r = session.get(url, headers=headers, timeout=60) r.raise_for_status() return r def parse_curl(curl_str): import shlex curl_str = curl_str.replace('\\\n', ' ').replace('\\\r\n', ' ') tokens = shlex.split(curl_str, posix=True) url = None headers = {} i = 0 while i < len(tokens): t = tokens[i] if t in ('-H', '--header') and i + 1 < len(tokens): i += 1 hdr = tokens[i] if ':' in hdr: k, v = hdr.split(':', 1) headers[k.strip()] = v.strip() elif t.startswith('http'): url = t elif t in ('-X', '--request') and i + 1 < len(tokens): i += 1 i += 1 if not url: m = re.search(r"(https?://[^\s'\"]+)", curl_str) if m: url = m.group(1) return url, headers def extract_sp_domain(manifest_url): qs = parse_qs(urlparse(manifest_url).query) docid = unquote(qs.get('docid', [''])[0]) if docid: return urlparse(docid).hostname return '' def resolve_url(base_url, template, rep_id, seg_time=None): url = template url = re.sub(r'\$RepresentationID[^&]*amp;', rep_id, url) url = re.sub(r'\$RepresentationID\$', rep_id, url) if seg_time is not None: url = re.sub(r'\$Time[^&]*amp;', str(seg_time), url) url = re.sub(r'\$Time\$', str(seg_time), url) return base_url + url def decrypt(data, key, iv): if not key: return data from Crypto.Cipher import AES cipher = AES.new(key, AES.MODE_CBC, iv) dec = cipher.decrypt(data) if len(dec) > 0: pad = dec[-1] if 0 < pad <= 16 and dec[-pad:] == bytes([pad]) * pad: dec = dec[:-pad] return dec