| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040 |
- #!/usr/bin/env python3
- """
- SharePoint Stream 视频下载器
- 用法:
- 1. 浏览器 DevTools -> Network -> 找到 videomanifest 请求
- 2. 右键 -> Copy as cURL (bash)
- 3. 粘贴到 curl_command.txt 文件中
- 4. 运行: python sp_video_dl.py curl_command.txt
-
- 或者直接运行 python sp_video_dl.py 进入交互模式
- 依赖: pip install requests pycryptodome
- """
- import re
- import os
- import sys
- import json
- import shlex
- import struct
- import argparse
- import traceback
- from io import BytesIO
- from urllib.parse import unquote, urljoin, urlparse, parse_qs
- from xml.etree import ElementTree as ET
- from concurrent.futures import ThreadPoolExecutor, as_completed
- try:
- import requests
- except ImportError:
- print("需要安装 requests: pip install requests")
- sys.exit(1)
- try:
- from Crypto.Cipher import AES
- except ImportError:
- AES = None
- print("[警告] 未安装 pycryptodome,将跳过解密步骤")
- print(" 安装: pip install pycryptodome")
- # ============================================================
- # cURL 解析
- # ============================================================
- def parse_curl(curl_str: str) -> dict:
- """解析 cURL 命令,提取 URL 和 headers"""
- curl_str = curl_str.strip()
- # 处理多行 (反斜杠换行)
- curl_str = curl_str.replace("^\n", " ").replace("\\\n", " ").replace("^\r\n", " ").replace("\\\r\n", " ")
-
- # 尝试用 shlex 分词
- try:
- tokens = shlex.split(curl_str, posix=True)
- except ValueError:
- # Windows 的 PowerShell cURL 可能有不同的引号
- curl_str = curl_str.replace("'", '"')
- tokens = shlex.split(curl_str, posix=True)
-
- url = None
- headers = {}
- method = "GET"
-
- i = 0
- while i < len(tokens):
- t = tokens[i]
- if t.lower() == "curl":
- i += 1
- continue
- elif t in ("-H", "--header"):
- i += 1
- if i < len(tokens):
- hdr = tokens[i]
- if ":" in hdr:
- k, v = hdr.split(":", 1)
- headers[k.strip()] = v.strip()
- elif t in ("-X", "--request"):
- i += 1
- if i < len(tokens):
- method = tokens[i].upper()
- elif t in ("--compressed", "-k", "--insecure"):
- pass
- elif t.startswith("http"):
- url = t
- elif not t.startswith("-") and url is None:
- if t.startswith("http"):
- url = t
- i += 1
-
- # 如果没找到 URL,尝试正则
- if not url:
- m = re.search(r"(https?://[^\s'\"]+)", curl_str)
- if m:
- url = m.group(1).rstrip("'\"")
-
- if not url:
- raise ValueError("无法从 cURL 命令中提取 URL")
-
- return {"url": url, "headers": headers, "method": method}
- def parse_raw_headers(text: str) -> dict:
- """解析原始 HTTP headers 文本"""
- headers = {}
- url = None
-
- for line in text.strip().splitlines():
- line = line.strip()
- if not line:
- continue
- # 检查是否是 URL
- if line.startswith("http"):
- url = line
- continue
- # 检查是否是 header
- if ":" in line:
- k, v = line.split(":", 1)
- k = k.strip()
- v = v.strip()
- # 跳过伪 headers
- if not k.startswith(":"):
- headers[k] = v
- elif k == ":authority":
- headers["Host"] = v
-
- return {"url": url, "headers": headers}
- # ============================================================
- # DASH Manifest 解析
- # ============================================================
- NS = {
- "mpd": "urn:mpeg:DASH:schema:MPD:2011",
- "sea": "urn:mpeg:dash:schema:sea:2012",
- }
- def parse_manifest(xml_text: str) -> dict:
- """解析 DASH MPD manifest"""
- root = ET.fromstring(xml_text)
-
- info = {
- "duration_str": root.get("mediaPresentationDuration", ""),
- "duration_sec": 0,
- "base_url": "",
- "tracks": [],
- }
-
- # 解析时长 PT0H0M2555.584S
- dur_m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:([\d.]+)S)?", info["duration_str"])
- if dur_m:
- h = int(dur_m.group(1) or 0)
- m = int(dur_m.group(2) or 0)
- s = float(dur_m.group(3) or 0)
- info["duration_sec"] = h * 3600 + m * 60 + s
-
- base_el = root.find("mpd:BaseURL", NS)
- if base_el is not None and base_el.text:
- info["base_url"] = base_el.text.strip()
-
- period = root.find("mpd:Period", NS)
- if period is None:
- return info
-
- for adapt in period.findall("mpd:AdaptationSet", NS):
- track = {
- "id": adapt.get("id"),
- "type": adapt.get("contentType"), # audio / video
- "mime": adapt.get("mimeType"),
- "codecs": adapt.get("codecs"),
- "label": "",
- "width": adapt.get("maxWidth"),
- "height": adapt.get("maxHeight"),
- "key_url": "",
- "iv": "",
- "init_tpl": "",
- "media_tpl": "",
- "timescale": 0,
- "segments": [], # list of segment start times
- "representations": [],
- }
-
- label_el = adapt.find("mpd:Label", NS)
- if label_el is not None and label_el.text:
- track["label"] = label_el.text
-
- # 加密
- cp = adapt.find("mpd:ContentProtection", NS)
- if cp is not None:
- crypto = cp.find("sea:CryptoPeriod", NS)
- if crypto is not None:
- track["key_url"] = crypto.get("keyUriTemplate", "").replace("&", "&")
- track["iv"] = crypto.get("IV", "")
-
- # SegmentTemplate
- seg_tpl = adapt.find("mpd:SegmentTemplate", NS)
- if seg_tpl is not None:
- track["init_tpl"] = seg_tpl.get("initialization", "").replace("&", "&")
- track["media_tpl"] = seg_tpl.get("media", "").replace("&", "&")
- track["timescale"] = int(seg_tpl.get("timescale", "1"))
-
- timeline = seg_tpl.find("mpd:SegmentTimeline", NS)
- if timeline is not None:
- t = 0
- for s in timeline.findall("mpd:S", NS):
- d = int(s.get("d", 0))
- r = int(s.get("r", 0))
- for _ in range(r + 1):
- track["segments"].append(t)
- t += d
-
- for rep in adapt.findall("mpd:Representation", NS):
- track["representations"].append({
- "id": rep.get("id"),
- "bandwidth": int(rep.get("bandwidth", 0)),
- "width": rep.get("width"),
- "height": rep.get("height"),
- "codecs": rep.get("codecs"),
- })
-
- info["tracks"].append(track)
-
- return info
- def resolve_url(base_url: str, template: str, rep_id: str, seg_time: int = None) -> str:
- """替换 DASH 模板中的占位符"""
- url = template
- # SharePoint 的模板占位符格式比较特殊,包含 GUID 后缀
- # $RepresentationIDe963038e-16ab-4be8-8c6d-17d09a407520amp;
- # $Timee963038e-16ab-4be8-8c6d-17d09a407520amp;
- url = re.sub(r'\$RepresentationID[^&]*amp;', rep_id, url)
- url = re.sub(r'\$RepresentationID\$', rep_id, url)
- if seg_time is not None:
- url = re.sub(r'\$Time[^&]*amp;', str(seg_time), url)
- url = re.sub(r'\$Time\$', str(seg_time), url)
-
- full = base_url + url
- return full
- # ============================================================
- # 下载器
- # ============================================================
- class SPVideoDownloader:
- def __init__(self, manifest_url: str, headers: dict, output_dir: str = "output",
- cookies: str = None):
- self.manifest_url = manifest_url
- self.headers = headers
- self.output_dir = output_dir
- self.session = requests.Session()
- self.session.headers.update({
- "User-Agent": headers.get("user-agent", headers.get("User-Agent",
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")),
- })
-
- # 提取关键 headers
- self.pac_token = None
- for k, v in headers.items():
- if k.lower() == "x-spopactoken":
- self.pac_token = v
- break
-
- self.origin = headers.get("origin", headers.get("Origin", ""))
- self.referer = headers.get("referer", headers.get("Referer", ""))
-
- self.manifest_info = None
- self.encryption_key = None
- self.encryption_iv = None
- self.sp_access_token = None
-
- # 设置 cookies (SharePoint 用 FedAuth/rtFa HttpOnly cookie 认证)
- if cookies:
- self._set_cookies(cookies)
-
- def _set_cookies(self, cookie_str: str):
- """解析并设置 cookies
-
- 支持格式:
- 1. JSON 文件路径 (Cookie-Editor / EditThisCookie 导出)
- 2. "name1=value1; name2=value2" (Cookie header 格式)
- """
- cookie_str = cookie_str.strip()
-
- # 从 manifest URL 提取 SharePoint domain
- sp_domain = ""
- parsed_manifest = urlparse(self.manifest_url)
- # manifest 是 mediap.svc.ms,但 segment 去 sharepoint.com
- # 从 docid 参数提取真实 domain
- qs = parse_qs(parsed_manifest.query)
- docid = qs.get('docid', [''])[0]
- if docid:
- docid_parsed = urlparse(unquote(docid))
- sp_domain = docid_parsed.hostname or ""
-
- if not sp_domain:
- sp_domain = "ecvcorp-my.sharepoint.com" # fallback
-
- print(f" Cookie domain: {sp_domain}")
-
- # 尝试作为 JSON 文件
- if os.path.isfile(cookie_str):
- with open(cookie_str, 'r', encoding='utf-8') as f:
- data = json.load(f)
-
- # Cookie-Editor 导出格式: [{name, value, domain, ...}, ...]
- if isinstance(data, list):
- for c in data:
- name = c.get('name', '')
- value = c.get('value', '')
- domain = c.get('domain', sp_domain)
- if domain.startswith('.'):
- domain = domain[1:]
- if name and value:
- self.session.cookies.set(name, value, domain=domain)
- print(f" 从 JSON 加载了 {len(data)} 个 cookies")
- # Netscape 格式或其他 dict 格式
- elif isinstance(data, dict):
- for name, value in data.items():
- self.session.cookies.set(name, str(value), domain=sp_domain)
- print(f" 从 JSON 加载了 {len(data)} 个 cookies")
- else:
- # Cookie header 格式: name1=value1; name2=value2
- # 注意: FedAuth 值很长且可能包含 base64 的 = 和 +
- # 只按 "; " (分号+空格) 分割,避免误切 base64 里的 =
- parts = cookie_str.split('; ')
- for part in parts:
- part = part.strip()
- if '=' in part:
- name, value = part.split('=', 1)
- self.session.cookies.set(name.strip(), value.strip(), domain=sp_domain)
-
- # 打印调试信息
- cookie_names = [c.name for c in self.session.cookies]
- print(f" 已设置 {len(cookie_names)} 个 cookies: {', '.join(cookie_names)}")
-
- has_fedauth = any(c.name == 'FedAuth' for c in self.session.cookies)
- has_rtfa = any(c.name == 'rtFa' for c in self.session.cookies)
- if not has_fedauth:
- print(" [警告] 缺少 FedAuth cookie - 这是 SharePoint 的主要认证 cookie")
- if not has_rtfa:
- print(" [警告] 缺少 rtFa cookie")
-
- def _svc_headers(self) -> dict:
- """mediap.svc.ms 请求的 headers"""
- h = {}
- if self.pac_token:
- h["x-spopactoken"] = self.pac_token
- if self.origin:
- h["Origin"] = self.origin
- if self.referer:
- h["Referer"] = self.referer
- return h
-
- def _sp_headers(self) -> dict:
- """SharePoint segment 下载的 headers
-
- segment URL 通过 P1/P4 签名参数自认证,不需要额外 token。
- 浏览器请求也没有 cookie 或 Authorization header。
- """
- h = {}
- if self.origin:
- h["Origin"] = self.origin
- if self.referer:
- h["Referer"] = self.referer
- return h
-
- def fetch_manifest(self, quiet: bool = False) -> dict:
- """获取并解析 DASH manifest"""
- if not quiet:
- print("[1/5] 获取 DASH manifest...")
- resp = self.session.get(self.manifest_url, headers=self._svc_headers())
- if resp.status_code == 401:
- print(f"\n[错误] 401 Unauthorized - 认证失败")
- print(" 可能原因:")
- print(" 1. 复制了 OPTIONS preflight 请求 (没有 x-spopactoken)")
- print(" 2. Token 已过期 (需要重新从浏览器复制)")
- print(" 3. 没有访问权限")
- print(f"\n 当前 PAC Token: {'有' if self.pac_token else '无'}")
- if self.pac_token:
- print(f" Token 前缀: {self.pac_token[:40]}...")
- sys.exit(1)
- resp.raise_for_status()
-
- xml_text = resp.text
- self.manifest_info = parse_manifest(xml_text)
-
- dur = self.manifest_info["duration_sec"]
- h, m, s = int(dur // 3600), int((dur % 3600) // 60), dur % 60
- if not quiet:
- print(f" 时长: {h:02d}:{m:02d}:{s:05.2f}")
- print(f" BaseURL: {self.manifest_info['base_url'][:80]}...")
-
- for track in self.manifest_info["tracks"]:
- t = track["type"]
- label = f" ({track['label']})" if track["label"] else ""
- segs = len(track["segments"])
- res = f" {track['width']}x{track['height']}" if track["width"] else ""
- reps = ", ".join(r["id"] for r in track["representations"])
- print(f" Track {track['id']}: {t}{label}{res} | {segs} segments | reps: {reps}")
-
- # 保存 manifest
- os.makedirs(self.output_dir, exist_ok=True)
- with open(os.path.join(self.output_dir, "manifest.mpd"), "w", encoding="utf-8") as f:
- f.write(xml_text)
-
- return self.manifest_info
-
- def _renew_manifest(self, track_type: str) -> dict:
- """重新获取 manifest 以刷新签名 URL"""
- print(" [续签] 重新获取 manifest...")
- self.fetch_manifest(quiet=True)
- # 返回对应类型的 track
- for track in self.manifest_info["tracks"]:
- if track["type"] == track_type:
- if track_type == "audio" and track["label"] != "OriginalAudio":
- continue
- return track
- # fallback
- for track in self.manifest_info["tracks"]:
- if track["type"] == track_type:
- return track
- return None
-
- def fetch_encryption_key(self, track: dict) -> bytes:
- """Step 2: 获取 AES-128 解密密钥"""
- if not track["key_url"]:
- print(" [跳过] 无加密")
- return None
-
- print("[2/5] 获取解密密钥...")
- resp = self.session.get(track["key_url"], headers=self._svc_headers())
- resp.raise_for_status()
- key = resp.content
- print(f" Key: {key.hex()} ({len(key)} bytes)")
-
- # 解析 IV
- iv_str = track["iv"]
- if iv_str.startswith("0x"):
- iv_str = iv_str[2:]
- self.encryption_iv = bytes.fromhex(iv_str)
- self.encryption_key = key
- print(f" IV: {self.encryption_iv.hex()}")
-
- return key
-
- def decrypt_segment(self, data: bytes) -> bytes:
- """AES-128-CBC 解密"""
- if not self.encryption_key or AES is None:
- return data
- cipher = AES.new(self.encryption_key, AES.MODE_CBC, self.encryption_iv)
- decrypted = cipher.decrypt(data)
- # PKCS7 unpadding
- if len(decrypted) > 0:
- pad = decrypted[-1]
- if 0 < pad <= 16 and decrypted[-pad:] == bytes([pad]) * pad:
- decrypted = decrypted[:-pad]
- return decrypted
-
- def download_track(self, track: dict, track_name: str, max_workers: int = 4) -> str:
- """下载一个 track 的所有 segments 并合并
-
- SharePoint 用 FedAuth/rtFa HttpOnly cookie 认证 segment 请求。
- 没有 cookie 时,URL 签名只能支撑约 50-70 个请求。
- 有 cookie 时可以全速下载。
- """
- import time as _time
- from concurrent.futures import ThreadPoolExecutor, as_completed
-
- base_url = self.manifest_info["base_url"]
- rep = track["representations"][0]
- rep_id = rep["id"]
- segments = track["segments"]
- total = len(segments)
- has_cookies = len(self.session.cookies) > 0
-
- out_path = os.path.join(self.output_dir, f"{track_name}.mp4")
-
- # 下载 init segment
- init_url = resolve_url(base_url, track["init_tpl"], rep_id)
- print(f" 下载 init segment...")
-
- # 调试: 检查 cookie 是否会被发送到这个 URL
- if has_cookies:
- matched = self.session.cookies.get_dict(domain=urlparse(init_url).hostname)
- print(f" [调试] 匹配到 {len(matched)} 个 cookies for {urlparse(init_url).hostname}")
- if not matched:
- print(f" [调试] Cookie jar 内容:")
- for c in self.session.cookies:
- print(f" {c.name} domain={c.domain} path={c.path}")
-
- resp = self.session.get(init_url, headers=self._sp_headers())
-
- # 调试: 检查实际发送的 cookie header
- if has_cookies:
- sent_cookie = resp.request.headers.get('Cookie', '')
- if sent_cookie:
- print(f" [调试] 实际发送 Cookie: {sent_cookie[:80]}...")
- else:
- print(f" [调试] 未发送任何 Cookie!")
-
- if resp.status_code == 401:
- print(f" init segment 401")
- if not has_cookies:
- print(" [提示] 缺少 FedAuth cookie,请用 -c 参数提供")
- print(" 在浏览器 Console 执行: copy(document.cookie)")
- print(" 然后: python sp_video_dl.py curl_command.txt -c \"粘贴的cookie\"")
- sys.exit(1)
- resp.raise_for_status()
- init_data = self.decrypt_segment(resp.content)
-
- print(f" 下载 {total} 个 media segments...")
-
- if has_cookies:
- # 有 cookie,可以并发下载
- workers = max_workers
- print(f" 模式: 并发 (workers={workers})")
- else:
- # 没有 cookie,串行 + 限速
- workers = 1
- print(f" 模式: 串行 (无 cookie,可能在 ~50 个后 401)")
-
- seg_data_map = {}
-
- def dl_one(idx, seg_time):
- url = resolve_url(base_url, track["media_tpl"], rep_id, seg_time)
- for attempt in range(3):
- r = self.session.get(url, headers=self._sp_headers())
- if r.status_code == 200:
- return idx, self.decrypt_segment(r.content)
- elif r.status_code in (429, 503):
- _time.sleep(int(r.headers.get("Retry-After", 5)))
- elif r.status_code == 401:
- _time.sleep(2)
- else:
- r.raise_for_status()
- # 最后一次
- r = self.session.get(url, headers=self._sp_headers())
- r.raise_for_status()
- return idx, self.decrypt_segment(r.content)
-
- downloaded = 0
- failed = 0
- with ThreadPoolExecutor(max_workers=workers) as pool:
- futures = {pool.submit(dl_one, i, t): i for i, t in enumerate(segments)}
- for future in as_completed(futures):
- try:
- idx, data = future.result()
- seg_data_map[idx] = data
- downloaded += 1
- if downloaded % 50 == 0 or downloaded == total:
- pct = downloaded / total * 100
- print(f" 进度: {downloaded}/{total} ({pct:.1f}%)")
- except Exception as e:
- failed += 1
- err = str(e)
- if "401" in err and not has_cookies:
- # 取消剩余
- for f in futures:
- f.cancel()
- print(f"\n 已下载 {downloaded}/{total} 后遇到 401")
- print(f" 根本原因: 缺少 SharePoint 认证 cookie (FedAuth/rtFa)")
- print(f" HAR 导出不包含 HttpOnly cookie,需要手动获取")
- print(f"\n 解决方案:")
- print(f" 1. 在浏览器 Console 执行: copy(document.cookie)")
- print(f" 2. 运行: python sp_video_dl.py curl_command.txt -c \"粘贴\"")
- print(f"\n 如果 document.cookie 为空 (HttpOnly),安装浏览器扩展:")
- print(f" - EditThisCookie 或 Cookie-Editor")
- print(f" - 导出 ecvcorp-my.sharepoint.com 的所有 cookie 为 JSON")
- print(f" - 运行: python sp_video_dl.py curl_command.txt -c cookies.json")
- sys.exit(1)
- if failed >= 5:
- for f in futures:
- f.cancel()
- print(f"\n[错误] 失败 {failed} 次: {err[:200]}")
- sys.exit(1)
-
- # 写入文件
- print(f" 写入 {out_path}...")
- with open(out_path, "wb") as f:
- f.write(init_data)
- for i in range(total):
- if i in seg_data_map:
- f.write(seg_data_map[i])
-
- size_mb = os.path.getsize(out_path) / 1024 / 1024
- print(f" 完成: {size_mb:.1f} MB")
- return out_path
-
- def select_tracks(self) -> tuple:
- """选择要下载的视频和音频 track"""
- video_track = None
- audio_track = None
-
- for track in self.manifest_info["tracks"]:
- if track["type"] == "video" and video_track is None:
- video_track = track
- elif track["type"] == "audio":
- # 优先选 OriginalAudio
- if track["label"] == "OriginalAudio":
- audio_track = track
- elif audio_track is None:
- audio_track = track
-
- return video_track, audio_track
-
- def fetch_transcript(self) -> str:
- """下载转录/字幕文件
-
- 流程:
- 1. 从 manifest URL 的 docid 参数提取 item API URL
- 2. 请求转录元数据获取转录列表
- 3. 下载转录内容 (实际为 WebVTT 格式)
- 4. 转换为 SRT 和纯文本
- """
- print("\n[额外] 下载转录...")
-
- parsed = urlparse(self.manifest_url)
- qs = parse_qs(parsed.query)
- docid = unquote(qs.get('docid', [''])[0])
-
- if not docid:
- print(" [跳过] 无法从 manifest URL 提取 docid")
- return None
-
- docid_parsed = urlparse(docid)
- sp_host = f"{docid_parsed.scheme}://{docid_parsed.hostname}"
- path = docid_parsed.path.replace('/_api/v2.0/', '/_api/v2.1/')
- item_url = sp_host + path
-
- # Step 1: 获取转录元数据
- meta_url = item_url + "?select=media%2Ftranscripts%2CaudioTracks&%24expand=media%2Ftranscripts%2Cmedia%2FaudioTracks"
- print(f" 获取转录元数据...")
-
- resp = self.session.get(meta_url, headers=self._sp_headers())
- if resp.status_code != 200:
- print(f" [跳过] 转录元数据请求返回 {resp.status_code}")
- return None
-
- try:
- meta = resp.json()
- except Exception:
- print(f" [跳过] 转录元数据解析失败")
- return None
-
- transcripts = meta.get('media', {}).get('transcripts', [])
- if not transcripts:
- print(f" [跳过] 该视频没有转录")
- return None
-
- print(f" 找到 {len(transcripts)} 个转录:")
- for t in transcripts:
- print(f" - {t.get('displayName', '?')} ({t.get('languageTag', '?')}, {t.get('size', 0)} bytes)")
-
- transcript = next((t for t in transcripts if t.get('isDefault')), transcripts[0])
- transcript_id = transcript.get('id', '')
-
- # Step 2: 下载转录内容
- download_url = transcript.get('temporaryDownloadUrl', '')
- if not download_url:
- download_url = f"{item_url}/versions/current/media/transcripts/{transcript_id}/content"
-
- print(f" 下载转录内容...")
- resp = self.session.get(download_url, headers=self._sp_headers())
- if resp.status_code != 200:
- print(f" [跳过] 转录下载返回 {resp.status_code}")
- return None
-
- # 确保正确的 UTF-8 编码
- resp.encoding = 'utf-8-sig' # 处理 BOM
- content = resp.text
-
- os.makedirs(self.output_dir, exist_ok=True)
-
- # 检测格式
- is_vtt = content.lstrip('\ufeff').startswith('WEBVTT')
-
- if is_vtt:
- # 保存 VTT 原始文件
- vtt_path = os.path.join(self.output_dir, "transcript.vtt")
- with open(vtt_path, 'w', encoding='utf-8') as f:
- f.write(content)
- print(f" 已保存 VTT: {vtt_path}")
-
- # 解析 VTT 并转换
- entries = self._parse_vtt(content)
- if entries:
- srt_path = os.path.join(self.output_dir, "transcript.srt")
- txt_path = os.path.join(self.output_dir, "transcript.txt")
- self._to_srt(entries, srt_path)
- self._to_txt(entries, txt_path)
- print(f" 已保存 SRT: {srt_path} ({len(entries)} 条)")
- print(f" 已保存纯文本: {txt_path}")
- return srt_path
- return vtt_path
- else:
- # 尝试 JSON
- raw_path = os.path.join(self.output_dir, "transcript.json")
- with open(raw_path, 'w', encoding='utf-8') as f:
- f.write(content)
- print(f" 已保存原始转录: {raw_path}")
-
- try:
- data = json.loads(content)
- entries = data if isinstance(data, list) else data.get('entries', data.get('cues', []))
- if not entries:
- for key in data:
- if isinstance(data[key], list) and len(data[key]) > 0:
- entries = data[key]
- break
- if entries:
- srt_path = os.path.join(self.output_dir, "transcript.srt")
- txt_path = os.path.join(self.output_dir, "transcript.txt")
- self._to_srt(entries, srt_path)
- self._to_txt(entries, txt_path)
- print(f" 已保存 SRT: {srt_path}")
- print(f" 已保存纯文本: {txt_path}")
- return srt_path
- except Exception as e:
- print(f" JSON 解析失败: {e}")
-
- return raw_path
-
- @staticmethod
- def _parse_vtt(content: str) -> list:
- """解析 WebVTT 格式为条目列表"""
- entries = []
- content = content.lstrip('\ufeff') # 去 BOM
- blocks = re.split(r'\n\n+', content)
-
- for block in blocks:
- lines = block.strip().split('\n')
- if len(lines) < 2:
- continue
-
- # 找时间行 (HH:MM:SS.mmm --> HH:MM:SS.mmm)
- time_line = None
- text_start = 0
- for j, line in enumerate(lines):
- if '-->' in line:
- time_line = line
- text_start = j + 1
- break
-
- if not time_line:
- continue
-
- # 解析时间
- m = re.match(r'(\d{2}:\d{2}:\d{2}\.\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2}\.\d{3})', time_line)
- if not m:
- continue
-
- start_str = m.group(1)
- end_str = m.group(2)
-
- def vtt_to_ms(t):
- parts = t.split(':')
- h, mi = int(parts[0]), int(parts[1])
- s_parts = parts[2].split('.')
- s, ms = int(s_parts[0]), int(s_parts[1])
- return h * 3600000 + mi * 60000 + s * 1000 + ms
-
- text_lines = lines[text_start:]
- text = ' '.join(l.strip() for l in text_lines if l.strip())
-
- # 提取说话人 (格式: <v Speaker Name>text</v> 或直接文本)
- speaker = ''
- speaker_m = re.match(r'<v\s+([^>]+)>(.*?)(?:</v>)?$', text)
- if speaker_m:
- speaker = speaker_m.group(1).strip()
- text = speaker_m.group(2).strip()
-
- entries.append({
- 'startTime': vtt_to_ms(start_str),
- 'endTime': vtt_to_ms(end_str),
- 'text': text,
- 'speakerName': speaker,
- })
-
- return entries
-
- @staticmethod
- def _ms_to_srt_time(ms: float) -> str:
- """毫秒转 SRT 时间格式 HH:MM:SS,mmm"""
- ms = int(ms)
- h = ms // 3600000
- m = (ms % 3600000) // 60000
- s = (ms % 60000) // 1000
- ms_rem = ms % 1000
- return f"{h:02d}:{m:02d}:{s:02d},{ms_rem:03d}"
-
- def _to_srt(self, entries: list, path: str):
- """转换为 SRT 字幕格式"""
- with open(path, 'w', encoding='utf-8') as f:
- for i, entry in enumerate(entries, 1):
- # 支持多种字段名
- start = entry.get('startTime', entry.get('start', entry.get('offset', 0)))
- end = entry.get('endTime', entry.get('end', start + entry.get('duration', 0)))
- text = entry.get('text', entry.get('value', entry.get('content', '')))
- speaker = entry.get('speakerName', entry.get('speaker', entry.get('displayName', '')))
-
- # 时间可能是秒或毫秒
- if isinstance(start, (int, float)) and start < 100000:
- start *= 1000 # 秒转毫秒
- end *= 1000
-
- start_str = self._ms_to_srt_time(start)
- end_str = self._ms_to_srt_time(end)
-
- line = f"[{speaker}] {text}" if speaker else text
- f.write(f"{i}\n{start_str} --> {end_str}\n{line}\n\n")
-
- def _to_txt(self, entries: list, path: str):
- """转换为纯文本 (带说话人和时间戳)"""
- with open(path, 'w', encoding='utf-8') as f:
- last_speaker = ""
- for entry in entries:
- start = entry.get('startTime', entry.get('start', entry.get('offset', 0)))
- text = entry.get('text', entry.get('value', entry.get('content', '')))
- speaker = entry.get('speakerName', entry.get('speaker', entry.get('displayName', '')))
-
- if isinstance(start, (int, float)) and start < 100000:
- start *= 1000
-
- mins = int(start / 1000 / 60)
- secs = int(start / 1000) % 60
-
- if speaker and speaker != last_speaker:
- f.write(f"\n[{speaker}] ({mins:02d}:{secs:02d})\n")
- last_speaker = speaker
-
- f.write(f"{text}\n")
-
- def run(self, max_workers: int = 4):
- """完整下载流程"""
- print("=" * 60)
- print("SharePoint Stream 视频下载器")
- print("=" * 60)
-
- # Step 1: Manifest
- self.fetch_manifest()
-
- video_track, audio_track = self.select_tracks()
-
- if not video_track:
- print("[错误] 未找到视频 track")
- return
-
- # Step 2: 解密密钥
- self.fetch_encryption_key(video_track)
-
- # Step 3: 下载视频
- print(f"\n[3/5] 下载视频 track...")
- video_path = self.download_track(video_track, "video", max_workers)
-
- # Step 4: 下载音频
- audio_path = None
- if audio_track:
- print(f"\n[4/5] 下载音频 track...")
- audio_path = self.download_track(audio_track, "audio", max_workers)
-
- # Step 5: 合并
- print(f"\n[5/5] 合并音视频...")
- final_path = os.path.join(self.output_dir, "final.mp4")
- if audio_path:
- ffmpeg_cmd = f'ffmpeg -y -i "{video_path}" -i "{audio_path}" -c copy "{final_path}"'
- print(f" 执行: {ffmpeg_cmd}")
- ret = os.system(ffmpeg_cmd)
- if ret == 0:
- print(f" 完成: {final_path}")
- size_mb = os.path.getsize(final_path) / 1024 / 1024
- print(f" 大小: {size_mb:.1f} MB")
- else:
- print(f" ffmpeg 失败 (返回码 {ret})")
- print(f" 视频和音频已分别保存,请手动合并:")
- print(f" {video_path}")
- print(f" {audio_path}")
- else:
- os.rename(video_path, final_path)
- print(f" 完成: {final_path}")
-
- # Step 6: 下载转录
- self.fetch_transcript()
-
- print("\n" + "=" * 60)
- print("下载完成!")
- print("=" * 60)
- # ============================================================
- # 入口
- # ============================================================
- def main():
- parser = argparse.ArgumentParser(
- description="SharePoint Stream 视频下载器",
- epilog="用法: 从浏览器 DevTools 复制 videomanifest 请求的 cURL 命令"
- )
- parser.add_argument("input", nargs="?", help="包含 cURL 命令的文件路径")
- parser.add_argument("-o", "--output", default="output", help="输出目录 (默认: output)")
- parser.add_argument("-w", "--workers", type=int, default=4, help="并发下载线程数 (默认: 4)")
- parser.add_argument("-t", "--token", help="driveAccessToken (如果 segment 下载 401,从浏览器 Console 获取)")
- parser.add_argument("-c", "--cookie", help="Cookie 字符串或 JSON 文件路径 (从浏览器获取 FedAuth/rtFa)")
- parser.add_argument("--dry-run", action="store_true", help="仅解析 manifest,不下载")
- parser.add_argument("--transcript-only", action="store_true", help="仅下载转录文件")
- args = parser.parse_args()
-
- # 获取 cURL 输入
- curl_text = None
-
- if args.input:
- with open(args.input, "r", encoding="utf-8") as f:
- curl_text = f.read()
- else:
- print("请粘贴 videomanifest 请求的 cURL 命令 (粘贴完成后按两次回车):")
- print("-" * 40)
- lines = []
- empty_count = 0
- while True:
- try:
- line = input()
- if line.strip() == "":
- empty_count += 1
- if empty_count >= 2:
- break
- lines.append(line)
- else:
- empty_count = 0
- lines.append(line)
- except EOFError:
- break
- curl_text = "\n".join(lines)
-
- if not curl_text or not curl_text.strip():
- print("[错误] 未提供输入")
- sys.exit(1)
-
- # 解析
- curl_text = curl_text.strip()
- if curl_text.lower().startswith("curl"):
- parsed = parse_curl(curl_text)
- elif curl_text.startswith("http"):
- parsed = parse_raw_headers(curl_text)
- else:
- # 尝试两种方式
- try:
- parsed = parse_curl(curl_text)
- except Exception:
- parsed = parse_raw_headers(curl_text)
-
- if not parsed.get("url"):
- print("[错误] 无法解析 URL")
- sys.exit(1)
-
- # ============================================================
- # 检测 OPTIONS preflight 请求 (最常见的错误)
- # ============================================================
- is_preflight = False
- method = parsed.get("method", "GET")
- has_acr_headers = any(
- k.lower() == "access-control-request-headers"
- for k in parsed["headers"]
- )
- has_pac_token = any(
- k.lower() == "x-spopactoken"
- for k in parsed["headers"]
- )
-
- if method == "OPTIONS" or has_acr_headers:
- is_preflight = True
-
- if is_preflight:
- print("\n" + "=" * 60)
- print("[错误] 你复制的是 OPTIONS preflight 请求,不是实际的 GET 请求!")
- print("=" * 60)
- print("""
- 这是浏览器 CORS 预检请求,里面没有认证 token,无法下载。
- 请按以下步骤重新复制:
- 1. 打开 DevTools -> Network 标签
- 2. 在 Filter 框输入 "videomanifest"
- 3. 你会看到两个同名请求:
- - 一个 Method 是 OPTIONS (预检) <- 不要复制这个
- - 一个 Method 是 GET (实际请求) <- 复制这个!
- 4. 右键点击 GET 那个请求 -> Copy -> Copy as cURL (bash)
-
- 如何区分:
- - OPTIONS 请求的 headers 里有 "access-control-request-headers"
- - GET 请求的 headers 里有 "x-spopactoken: v1.eyJ..." (一个很长的 token)
- """)
- sys.exit(1)
-
- if not has_pac_token:
- print("\n[警告] 未找到 x-spopactoken header,请求可能会 401")
- print(" 确保复制的是 GET 请求 (不是 OPTIONS preflight)")
-
- print(f"\n解析到 URL: {parsed['url'][:100]}...")
- print(f"Headers: {len(parsed['headers'])} 个")
- for k in sorted(parsed["headers"].keys()):
- v = parsed["headers"][k]
- if len(v) > 60:
- v = v[:60] + "..."
- print(f" {k}: {v}")
-
- if "videomanifest" not in parsed["url"]:
- print("\n[警告] URL 中不包含 'videomanifest',可能不是正确的请求")
- resp = input("是否继续? (y/N): ")
- if resp.lower() != "y":
- sys.exit(0)
-
- # 下载
- downloader = SPVideoDownloader(
- manifest_url=parsed["url"],
- headers=parsed["headers"],
- output_dir=args.output,
- cookies=args.cookie,
- )
-
- # 设置 driveAccessToken
- if args.token:
- token = args.token.strip()
- if token.startswith("access_token="):
- token = token[len("access_token="):]
- downloader.sp_access_token = token
- print(f"\n已设置 driveAccessToken: {token[:40]}...")
-
- if args.dry_run:
- downloader.fetch_manifest()
- print("\n[DRY RUN] 仅解析 manifest,不下载")
- elif args.transcript_only:
- downloader.fetch_transcript()
- else:
- downloader.run(max_workers=args.workers)
- if __name__ == "__main__":
- main()
|