| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- """Download audio from SharePoint Stream and upload to S3."""
- import json
- import re
- import os
- import time
- import boto3
- from urllib.parse import unquote, urlparse, parse_qs
- from xml.etree import ElementTree as ET
- from shared import S3_BUCKET, update_job
- MAX_RETRIES = 5
- RETRY_BACKOFF = [1, 2, 4, 8, 16] # seconds
- s3 = boto3.client('s3')
- NS = {
- 'mpd': 'urn:mpeg:DASH:schema:MPD:2011',
- 'sea': 'urn:mpeg:dash:schema:sea:2012',
- }
- def handler(event, context):
- job_id = event['job_id']
- update_job(job_id, status='DOWNLOADING')
- # Load input
- obj = s3.get_object(Bucket=S3_BUCKET, Key=f"jobs/{job_id}/input.json")
- inp = json.loads(obj['Body'].read())
- curl_cmd = inp['curl']
- cookies_str = inp.get('cookies', '')
- language = inp.get('language', 'auto')
- # Parse cURL
- manifest_url, headers = parse_curl(curl_cmd)
- # Setup session
- import requests
- session = requests.Session()
- session.headers.update({
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
- })
- # Set cookies
- if cookies_str:
- sp_domain = extract_sp_domain(manifest_url)
- cookies_str = cookies_str.strip()
- # 支持 Cookie-Editor JSON 数组格式
- if cookies_str.startswith('['):
- try:
- cookie_list = json.loads(cookies_str)
- for c in cookie_list:
- name = c.get('name', '')
- value = c.get('value', '')
- domain = c.get('domain', sp_domain)
- if domain.startswith('.'):
- domain = domain[1:]
- if name and value:
- session.cookies.set(name, value, domain=domain)
- except json.JSONDecodeError:
- pass
- else:
- # name=value; name=value 格式
- for pair in cookies_str.split('; '):
- if '=' in pair:
- name, value = pair.split('=', 1)
- session.cookies.set(name.strip(), value.strip(), domain=sp_domain)
- # Fetch manifest
- svc_headers = {}
- for k, v in headers.items():
- if k.lower() in ('x-spopactoken', 'origin', 'referer'):
- svc_headers[k] = v
- resp = fetch_with_retry(session, manifest_url, svc_headers)
- manifest_xml = resp.text
- # Parse manifest - find audio track
- root = ET.fromstring(manifest_xml)
- base_url_el = root.find('mpd:BaseURL', NS)
- base_url = base_url_el.text.strip() if base_url_el is not None else ''
- period = root.find('mpd:Period', NS)
- audio_track = None
- for adapt in period.findall('mpd:AdaptationSet', NS):
- if adapt.get('contentType') == 'audio':
- label_el = adapt.find('mpd:Label', NS)
- label = label_el.text if label_el is not None else ''
- if label == 'OriginalAudio' or audio_track is None:
- audio_track = adapt
- if label == 'OriginalAudio':
- break
- if audio_track is None:
- raise Exception("No audio track found in manifest")
- # Get encryption key
- cp = audio_track.find('mpd:ContentProtection', NS)
- enc_key = None
- enc_iv = None
- if cp is not None:
- crypto = cp.find('sea:CryptoPeriod', NS)
- if crypto is not None:
- key_url = crypto.get('keyUriTemplate', '').replace('&', '&')
- iv_str = crypto.get('IV', '')
- if key_url:
- kr = fetch_with_retry(session, key_url, svc_headers)
- enc_key = kr.content
- if iv_str.startswith('0x'):
- iv_str = iv_str[2:]
- enc_iv = bytes.fromhex(iv_str) if iv_str else None
- # Parse segments
- seg_tpl = audio_track.find('mpd:SegmentTemplate', NS)
- init_tpl = seg_tpl.get('initialization', '').replace('&', '&')
- media_tpl = seg_tpl.get('media', '').replace('&', '&')
- rep = audio_track.find('mpd:Representation', NS)
- rep_id = rep.get('id', '')
- segments = []
- timeline = seg_tpl.find('mpd:SegmentTimeline', NS)
- t = 0
- for s_el in timeline.findall('mpd:S', NS):
- d = int(s_el.get('d', 0))
- r = int(s_el.get('r', 0))
- for _ in range(r + 1):
- segments.append(t)
- t += d
- # Download init + segments
- sp_headers = {}
- for k, v in headers.items():
- if k.lower() in ('origin', 'referer'):
- sp_headers[k] = v
- audio_data = bytearray()
- # Init segment
- init_url = resolve_url(base_url, init_tpl, rep_id)
- r = fetch_with_retry(session, init_url, sp_headers)
- audio_data.extend(decrypt(r.content, enc_key, enc_iv))
- # Media segments
- total = len(segments)
- for idx, seg_time in enumerate(segments):
- url = resolve_url(base_url, media_tpl, rep_id, seg_time)
- r = fetch_with_retry(session, url, sp_headers)
- audio_data.extend(decrypt(r.content, enc_key, enc_iv))
- if (idx + 1) % 50 == 0:
- print(f"[download] progress: {idx+1}/{total} segments")
- # Upload to S3
- s3_key = f"jobs/{job_id}/audio.mp4"
- s3.put_object(Bucket=S3_BUCKET, Key=s3_key, Body=bytes(audio_data), ContentType='audio/mp4')
- update_job(job_id, status='DOWNLOADED', audio_s3_key=s3_key)
- return {**event, 'audio_s3_key': s3_key, 'language': language}
- def fetch_with_retry(session, url, headers):
- """GET with retry on 5xx / connection errors."""
- import requests
- for attempt in range(MAX_RETRIES):
- try:
- r = session.get(url, headers=headers, timeout=60)
- if r.status_code < 500:
- r.raise_for_status()
- return r
- # 5xx: retry
- print(f"[download] HTTP {r.status_code}, retry {attempt+1}/{MAX_RETRIES}")
- except (requests.ConnectionError, requests.Timeout) as e:
- print(f"[download] {type(e).__name__}, retry {attempt+1}/{MAX_RETRIES}")
- if attempt < MAX_RETRIES - 1:
- time.sleep(RETRY_BACKOFF[attempt])
- # Last attempt — let it raise
- r = session.get(url, headers=headers, timeout=60)
- r.raise_for_status()
- return r
- def parse_curl(curl_str):
- import shlex
- curl_str = curl_str.replace('\\\n', ' ').replace('\\\r\n', ' ')
- tokens = shlex.split(curl_str, posix=True)
- url = None
- headers = {}
- i = 0
- while i < len(tokens):
- t = tokens[i]
- if t in ('-H', '--header') and i + 1 < len(tokens):
- i += 1
- hdr = tokens[i]
- if ':' in hdr:
- k, v = hdr.split(':', 1)
- headers[k.strip()] = v.strip()
- elif t.startswith('http'):
- url = t
- elif t in ('-X', '--request') and i + 1 < len(tokens):
- i += 1
- i += 1
- if not url:
- m = re.search(r"(https?://[^\s'\"]+)", curl_str)
- if m:
- url = m.group(1)
- return url, headers
- def extract_sp_domain(manifest_url):
- qs = parse_qs(urlparse(manifest_url).query)
- docid = unquote(qs.get('docid', [''])[0])
- if docid:
- return urlparse(docid).hostname
- return ''
- def resolve_url(base_url, template, rep_id, seg_time=None):
- url = template
- url = re.sub(r'\$RepresentationID[^&]*amp;', rep_id, url)
- url = re.sub(r'\$RepresentationID\$', rep_id, url)
- if seg_time is not None:
- url = re.sub(r'\$Time[^&]*amp;', str(seg_time), url)
- url = re.sub(r'\$Time\$', str(seg_time), url)
- return base_url + url
- def decrypt(data, key, iv):
- if not key:
- return data
- from Crypto.Cipher import AES
- cipher = AES.new(key, AES.MODE_CBC, iv)
- dec = cipher.decrypt(data)
- if len(dec) > 0:
- pad = dec[-1]
- if 0 < pad <= 16 and dec[-pad:] == bytes([pad]) * pad:
- dec = dec[:-pad]
- return dec
|