HLS视频下载神器:Python实现加密流媒体一键下载与合并
基于Python的HLS视频下载工具开发全解析,支持加密流和并行下载
引言:为什么需要HLS视频下载工具?
在日常工作和学习中,我们经常会遇到需要保存HLS流媒体视频的情况——可能是在线教育课程、重要会议录像,或是珍贵的媒体资料。虽然浏览器可以播放这些视频,但往往不提供下载功能。特别是当视频使用AES加密时,手动下载变得更加困难。
本文介绍的工具正是为了解决这一痛点:一个功能完整的HLS视频下载器,支持加密流、并行下载和自动合并。
一、HLS技术背景简介
1.1 什么是HLS?
HLS(HTTP Live Streaming)是苹果公司开发的流媒体传输协议,它将视频分割成小的TS文件片段,通过M3U8播放列表索引。这种技术允许自适应码率切换,非常适合网络视频传输。
1.2 HLS的加密机制
HLS通常使用AES-128加密保护视频内容,加密信息存储在M3U8文件的#EXT-X-KEY
标签中,包括密钥URL和初始化向量(IV)。
二、工具设计与架构
2.1 整体架构
输入M3U8 URL → 解析播放列表 → 下载密钥(如有加密) → 并行下载片段 → 解密处理 → 合并输出
2.2 核心功能模块
- M3U8解析器:分析播放列表,提取片段和加密信息
- 多线程下载器:并发下载视频片段,提高下载速度
- AES解密器:处理加密视频片段的解密
- FFmpeg集成:合并TS片段为MP4文件
三、代码实现详解
3.1 环境准备与依赖安装
首先需要安装必要的Python库:
pip install requests tqdm pycryptodome
同时需要安装FFmpeg(用于视频合并):
# Ubuntu/Debian
sudo apt install ffmpeg
# macOS
brew install ffmpeg
# Windows:从https://ffmpeg.org/download.html下载并添加到PATH
3.2 核心类HLSDownloader
class HLSDownloader:
def __init__(self, m3u8_url, output_path="output.mp4", max_workers=8):
self.m3u8_url = m3u8_url
self.output_path = output_path
self.max_workers = max_workers
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
3.3 M3U8文件解析
def parse_m3u8(self, content):
"""解析m3u8文件内容"""
lines = content.strip().split('\n')
segments = []
encryption_key = None
encryption_iv = None
base_url = '/'.join(self.m3u8_url.split('/')[:-1]) + '/'
i = 0
while i < len(lines):
line = lines[i].strip()
# 解析加密信息
if line.startswith('#EXT-X-KEY:'):
key_match = re.search(r'URI="([^"]+)"', line)
iv_match = re.search(r'IV=([^,\s]+)', line)
if key_match:
key_url = key_match.group(1)
if not key_url.startswith('http'):
key_url = urljoin(base_url, key_url)
encryption_key = key_url
if iv_match:
encryption_iv = iv_match.group(1)
if encryption_iv.startswith('0x'):
encryption_iv = bytes.fromhex(encryption_iv[2:])
# 解析视频片段
elif line.startswith('#EXTINF:'):
if i + 1 < len(lines):
segment_url = lines[i + 1].strip()
if not segment_url.startswith('http'):
segment_url = urljoin(base_url, segment_url)
segments.append(segment_url)
i += 1
i += 1
return segments, encryption_key, encryption_iv
3.4 多线程下载实现
def download_segment(self, segment_info):
"""下载单个视频片段"""
segment_url, temp_dir, segment_index, key, iv = segment_info
try:
response = self.session.get(segment_url, timeout=60)
response.raise_for_status()
segment_data = response.content
# 如果有加密,进行解密
if key and iv:
# 对于每个片段,IV可能需要递增
if isinstance(iv, bytes):
segment_iv = bytearray(iv)
# 将segment_index写入IV的最后4个字节
segment_iv[-4:] = segment_index.to_bytes(4, 'big')
iv = bytes(segment_iv)
cipher = AES.new(key, AES.MODE_CBC, iv)
segment_data = cipher.decrypt(segment_data)
# 移除PKCS7填充
pad_len = segment_data[-1]
if pad_len <= 16:
segment_data = segment_data[:-pad_len]
# 保存片段文件
segment_path = os.path.join(temp_dir, f"segment_{segment_index:06d}.ts")
with open(segment_path, 'wb') as f:
f.write(segment_data)
return segment_path, None
except Exception as e:
return None, str(e)
3.5 使用FFmpeg合并视频
def merge_segments(self, segment_files, output_path):
"""使用ffmpeg合并视频片段"""
# 创建文件列表
list_file = "segments_list.txt"
try:
with open(list_file, 'w', encoding='utf-8') as f:
for segment_file in segment_files:
# 使用绝对路径避免问题
abs_path = os.path.abspath(segment_file)
f.write(f"file '{abs_path}'\n")
# 使用ffmpeg合并
cmd = [
'ffmpeg', '-f', 'concat', '-safe', '0',
'-i', list_file, '-c', 'copy', output_path, '-y'
]
print("正在合并视频片段...")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"✅ 视频已保存至: {output_path}")
return True
else:
print(f"❌ FFmpeg错误: {result.stderr}")
return False
except Exception as e:
print(f"❌ 合并失败: {e}")
return False
finally:
# 清理文件列表
if os.path.exists(list_file):
os.remove(list_file)
四、使用教程
4.1 基本使用方法
# 最简单用法
python hls_downloader.py https://example.com/video.m3u8
# 指定输出文件名
python hls_downloader.py https://example.com/video.m3u8 -o my_video.mp4
# 指定并发线程数
python hls_downloader.py https://example.com/video.m3u8 -w 16
4.2 获取M3U8链接的方法
浏览器开发者工具:
- 打开开发者工具(F12)
- 切换到Network(网络)标签
- 刷新页面并开始播放视频
- 过滤.m3u8文件,复制URL
使用浏览器扩展:
- 安装"HLS Stream Detector"等扩展
- 自动检测页面中的HLS流
4.3 处理常见问题
问题1:下载速度慢
- 增加并发线程数:
-w 16
- 检查网络连接
- 尝试使用代理
问题2:解密失败
- 确认工具支持该加密方式
- 检查密钥URL是否可访问
问题3:合并失败
- 确认已安装FFmpeg并添加到PATH
- 检查磁盘空间是否充足
五、高级功能与自定义
5.1 自定义请求头
有些网站需要特定的请求头才能访问,可以修改代码中的session配置:
self.session.headers.update({
'User-Agent': '自定义User-Agent',
'Referer': 'https://example.com',
'Cookie': '你的cookie信息'
})
5.2 代理支持
添加代理支持可以绕过某些地域限制:
def __init__(self, m3u8_url, output_path="output.mp4", max_workers=8, proxy=None):
# ...其他初始化代码
if proxy:
self.session.proxies = {
'http': proxy,
'https': proxy
}
5.3 断点续传
通过记录已下载的片段,实现断点续传功能:
def resume_download(self, temp_dir):
"""检查并恢复下载"""
downloaded_segments = []
for file in os.listdir(temp_dir):
if file.startswith("segment_") and file.endswith(".ts"):
index = int(file.split('_')[1].split('.')[0])
downloaded_segments.append(index)
return downloaded_segments
六、法律与道德考量
6.1 合法使用
- 仅下载你有权访问的内容
- 尊重版权和知识产权
- 不要绕过付费墙下载付费内容
6.2 教育目的
本工具主要用于教育和技术研究目的,帮助开发者理解:
- HLS协议的工作原理
- AES加密解密流程
- 多线程网络编程
- FFmpeg视频处理
七、完整代码获取
本文涉及的完整代码已经开源,可以通过以下方式获取:
#!/usr/bin/env python3
"""
HLS视频下载工具
支持加密HLS流下载并合并为MP4
"""
import os
import sys
import re
import subprocess
import requests
import tempfile
import shutil
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from Crypto.Cipher import AES
import argparse
class HLSDownloader:
def __init__(self, m3u8_url, output_path="output.mp4", max_workers=8):
self.m3u8_url = m3u8_url
self.output_path = output_path
self.max_workers = max_workers
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def download_m3u8(self):
"""下载并解析m3u8文件"""
try:
response = self.session.get(self.m3u8_url, timeout=30)
response.raise_for_status()
return response.text
except Exception as e:
print(f"下载m3u8文件失败: {e}")
return None
def parse_m3u8(self, content):
"""解析m3u8文件内容"""
lines = content.strip().split('\n')
segments = []
encryption_key = None
encryption_iv = None
base_url = '/'.join(self.m3u8_url.split('/')[:-1]) + '/'
i = 0
while i < len(lines):
line = lines[i].strip()
# 解析加密信息
if line.startswith('#EXT-X-KEY:'):
key_match = re.search(r'URI="([^"]+)"', line)
iv_match = re.search(r'IV=([^,\s]+)', line)
if key_match:
key_url = key_match.group(1)
if not key_url.startswith('http'):
key_url = urljoin(base_url, key_url)
encryption_key = key_url
if iv_match:
encryption_iv = iv_match.group(1)
if encryption_iv.startswith('0x'):
encryption_iv = bytes.fromhex(encryption_iv[2:])
# 解析视频片段
elif line.startswith('#EXTINF:'):
if i + 1 < len(lines):
segment_url = lines[i + 1].strip()
if not segment_url.startswith('http'):
segment_url = urljoin(base_url, segment_url)
segments.append(segment_url)
i += 1
i += 1
return segments, encryption_key, encryption_iv
def download_key(self, key_url):
"""下载加密密钥"""
try:
response = self.session.get(key_url, timeout=30)
response.raise_for_status()
return response.content
except Exception as e:
print(f"下载密钥失败: {e}")
return None
def download_segment(self, segment_info):
"""下载单个视频片段"""
segment_url, temp_dir, segment_index, key, iv = segment_info
try:
response = self.session.get(segment_url, timeout=60)
response.raise_for_status()
segment_data = response.content
# 如果有加密,进行解密
if key and iv:
# 对于每个片段,IV可能需要递增
if isinstance(iv, bytes):
segment_iv = bytearray(iv)
# 将segment_index写入IV的最后4个字节
segment_iv[-4:] = segment_index.to_bytes(4, 'big')
iv = bytes(segment_iv)
cipher = AES.new(key, AES.MODE_CBC, iv)
segment_data = cipher.decrypt(segment_data)
# 移除PKCS7填充
pad_len = segment_data[-1]
if pad_len <= 16:
segment_data = segment_data[:-pad_len]
# 保存片段文件
segment_path = os.path.join(temp_dir, f"segment_{segment_index:06d}.ts")
with open(segment_path, 'wb') as f:
f.write(segment_data)
return segment_path, None
except Exception as e:
return None, str(e)
def merge_segments(self, segment_files, output_path):
"""使用ffmpeg合并视频片段"""
# 创建文件列表
list_file = "segments_list.txt"
try:
with open(list_file, 'w', encoding='utf-8') as f:
for segment_file in segment_files:
# 使用绝对路径避免问题
abs_path = os.path.abspath(segment_file)
f.write(f"file '{abs_path}'\n")
# 使用ffmpeg合并
cmd = [
'ffmpeg', '-f', 'concat', '-safe', '0',
'-i', list_file, '-c', 'copy', output_path, '-y'
]
print("正在合并视频片段...")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"✅ 视频已保存至: {output_path}")
return True
else:
print(f"❌ FFmpeg错误: {result.stderr}")
return False
except Exception as e:
print(f"❌ 合并失败: {e}")
return False
finally:
# 清理文件列表
if os.path.exists(list_file):
os.remove(list_file)
def download(self):
"""主下载流程"""
print(f"🚀 开始下载HLS视频: {self.m3u8_url}")
# 1. 下载m3u8文件
print("📥 下载播放列表...")
m3u8_content = self.download_m3u8()
if not m3u8_content:
print("❌ 无法获取播放列表")
return False
# 2. 解析m3u8文件
segments, key_url, iv = self.parse_m3u8(m3u8_content)
print(f"📄 发现 {len(segments)} 个视频片段")
if not segments:
print("❌ 未找到视频片段")
return False
# 3. 下载加密密钥
key = None
if key_url:
print("🔐 下载加密密钥...")
key = self.download_key(key_url)
if not key:
print("❌ 无法获取加密密钥")
return False
print(f"✅ 密钥获取成功 (长度: {len(key)} 字节)")
# 4. 创建临时目录
temp_dir = tempfile.mkdtemp()
print(f"📁 临时目录: {temp_dir}")
try:
# 5. 并行下载所有片段
print(f"⬇️ 开始下载片段 (并发数: {self.max_workers})...")
segment_tasks = [
(url, temp_dir, idx, key, iv)
for idx, url in enumerate(segments)
]
downloaded_segments = []
failed_segments = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有下载任务
future_to_index = {
executor.submit(self.download_segment, task): task[2]
for task in segment_tasks
}
# 使用进度条显示下载进度
with tqdm(total=len(segments), desc="下载进度") as pbar:
for future in as_completed(future_to_index):
segment_path, error = future.result()
index = future_to_index[future]
if segment_path:
downloaded_segments.append((index, segment_path))
else:
failed_segments.append((index, error))
pbar.update(1)
# 检查下载结果
if failed_segments:
print(f"⚠️ {len(failed_segments)} 个片段下载失败:")
for idx, error in failed_segments:
print(f" 片段 {idx}: {error}")
if not downloaded_segments:
print("❌ 没有成功下载的片段")
return False
# 按索引排序片段
downloaded_segments.sort(key=lambda x: x[0])
segment_files = [path for _, path in downloaded_segments]
print(f"✅ 成功下载 {len(segment_files)} 个片段")
# 6. 合并视频
success = self.merge_segments(segment_files, self.output_path)
return success
finally:
# 清理临时目录
try:
shutil.rmtree(temp_dir)
print("🧹 已清理临时文件")
except:
pass
def main():
parser = argparse.ArgumentParser(description="HLS视频下载工具")
parser.add_argument("url", help="HLS播放列表URL (.m3u8)")
parser.add_argument("-o", "--output", default="video.mp4", help="输出文件路径")
parser.add_argument("-w", "--workers", type=int, default=8, help="并发下载数")
args = parser.parse_args()
# 检查ffmpeg
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("❌ 未找到ffmpeg,请先安装ffmpeg")
print("macOS: brew install ffmpeg")
print("Ubuntu: sudo apt install ffmpeg")
print("Windows: https://ffmpeg.org/download.html")
sys.exit(1)
# 创建下载器并开始下载
downloader = HLSDownloader(args.url, args.output, args.workers)
success = downloader.download()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
结语
通过本文,我们深入探讨了HLS视频下载工具的开发过程,从协议解析到加密处理,从多线程下载到视频合并。这个工具不仅实用,而且是一个很好的学习项目,涵盖了网络编程、加密解密、多线程处理和视频处理等多个技术领域。
希望这个工具和文章对你有所帮助,如果有任何问题或建议,欢迎在评论区讨论!
免责声明:请仅在合法范围内使用此工具,尊重内容创作者的版权和劳动成果。