代码 HLS视频下载神器:Python实现加密流媒体一键下载与合并

2025-08-30 19:21:22 +0800 CST views 9

HLS视频下载神器:Python实现加密流媒体一键下载与合并

基于Python的HLS视频下载工具开发全解析,支持加密流和并行下载

引言:为什么需要HLS视频下载工具?

在日常工作和学习中,我们经常会遇到需要保存HLS流媒体视频的情况——可能是在线教育课程、重要会议录像,或是珍贵的媒体资料。虽然浏览器可以播放这些视频,但往往不提供下载功能。特别是当视频使用AES加密时,手动下载变得更加困难。

本文介绍的工具正是为了解决这一痛点:一个功能完整的HLS视频下载器,支持加密流、并行下载和自动合并。

一、HLS技术背景简介

1.1 什么是HLS?

HLS(HTTP Live Streaming)是苹果公司开发的流媒体传输协议,它将视频分割成小的TS文件片段,通过M3U8播放列表索引。这种技术允许自适应码率切换,非常适合网络视频传输。

1.2 HLS的加密机制

HLS通常使用AES-128加密保护视频内容,加密信息存储在M3U8文件的#EXT-X-KEY标签中,包括密钥URL和初始化向量(IV)。

二、工具设计与架构

2.1 整体架构

输入M3U8 URL → 解析播放列表 → 下载密钥(如有加密) → 并行下载片段 → 解密处理 → 合并输出

2.2 核心功能模块

  • M3U8解析器:分析播放列表,提取片段和加密信息
  • 多线程下载器:并发下载视频片段,提高下载速度
  • AES解密器:处理加密视频片段的解密
  • FFmpeg集成:合并TS片段为MP4文件

三、代码实现详解

3.1 环境准备与依赖安装

首先需要安装必要的Python库:

pip install requests tqdm pycryptodome

同时需要安装FFmpeg(用于视频合并):

# Ubuntu/Debian
sudo apt install ffmpeg

# macOS
brew install ffmpeg

# Windows:从https://ffmpeg.org/download.html下载并添加到PATH

3.2 核心类HLSDownloader

class HLSDownloader:
    def __init__(self, m3u8_url, output_path="output.mp4", max_workers=8):
        self.m3u8_url = m3u8_url
        self.output_path = output_path
        self.max_workers = max_workers
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })

3.3 M3U8文件解析

def parse_m3u8(self, content):
    """解析m3u8文件内容"""
    lines = content.strip().split('\n')
    segments = []
    encryption_key = None
    encryption_iv = None
    
    base_url = '/'.join(self.m3u8_url.split('/')[:-1]) + '/'
    
    i = 0
    while i < len(lines):
        line = lines[i].strip()
        
        # 解析加密信息
        if line.startswith('#EXT-X-KEY:'):
            key_match = re.search(r'URI="([^"]+)"', line)
            iv_match = re.search(r'IV=([^,\s]+)', line)
            
            if key_match:
                key_url = key_match.group(1)
                if not key_url.startswith('http'):
                    key_url = urljoin(base_url, key_url)
                encryption_key = key_url
            
            if iv_match:
                encryption_iv = iv_match.group(1)
                if encryption_iv.startswith('0x'):
                    encryption_iv = bytes.fromhex(encryption_iv[2:])
        
        # 解析视频片段
        elif line.startswith('#EXTINF:'):
            if i + 1 < len(lines):
                segment_url = lines[i + 1].strip()
                if not segment_url.startswith('http'):
                    segment_url = urljoin(base_url, segment_url)
                segments.append(segment_url)
                i += 1
        
        i += 1
    
    return segments, encryption_key, encryption_iv

3.4 多线程下载实现

def download_segment(self, segment_info):
    """下载单个视频片段"""
    segment_url, temp_dir, segment_index, key, iv = segment_info
    
    try:
        response = self.session.get(segment_url, timeout=60)
        response.raise_for_status()
        
        segment_data = response.content
        
        # 如果有加密,进行解密
        if key and iv:
            # 对于每个片段,IV可能需要递增
            if isinstance(iv, bytes):
                segment_iv = bytearray(iv)
                # 将segment_index写入IV的最后4个字节
                segment_iv[-4:] = segment_index.to_bytes(4, 'big')
                iv = bytes(segment_iv)
            
            cipher = AES.new(key, AES.MODE_CBC, iv)
            segment_data = cipher.decrypt(segment_data)
            
            # 移除PKCS7填充
            pad_len = segment_data[-1]
            if pad_len <= 16:
                segment_data = segment_data[:-pad_len]
        
        # 保存片段文件
        segment_path = os.path.join(temp_dir, f"segment_{segment_index:06d}.ts")
        with open(segment_path, 'wb') as f:
            f.write(segment_data)
        
        return segment_path, None
        
    except Exception as e:
        return None, str(e)

3.5 使用FFmpeg合并视频

def merge_segments(self, segment_files, output_path):
    """使用ffmpeg合并视频片段"""
    # 创建文件列表
    list_file = "segments_list.txt"
    try:
        with open(list_file, 'w', encoding='utf-8') as f:
            for segment_file in segment_files:
                # 使用绝对路径避免问题
                abs_path = os.path.abspath(segment_file)
                f.write(f"file '{abs_path}'\n")
        
        # 使用ffmpeg合并
        cmd = [
            'ffmpeg', '-f', 'concat', '-safe', '0', 
            '-i', list_file, '-c', 'copy', output_path, '-y'
        ]
        
        print("正在合并视频片段...")
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode == 0:
            print(f"✅ 视频已保存至: {output_path}")
            return True
        else:
            print(f"❌ FFmpeg错误: {result.stderr}")
            return False
            
    except Exception as e:
        print(f"❌ 合并失败: {e}")
        return False
    finally:
        # 清理文件列表
        if os.path.exists(list_file):
            os.remove(list_file)

四、使用教程

4.1 基本使用方法

# 最简单用法
python hls_downloader.py https://example.com/video.m3u8

# 指定输出文件名
python hls_downloader.py https://example.com/video.m3u8 -o my_video.mp4

# 指定并发线程数
python hls_downloader.py https://example.com/video.m3u8 -w 16

4.2 获取M3U8链接的方法

  1. 浏览器开发者工具

    • 打开开发者工具(F12)
    • 切换到Network(网络)标签
    • 刷新页面并开始播放视频
    • 过滤.m3u8文件,复制URL
  2. 使用浏览器扩展

    • 安装"HLS Stream Detector"等扩展
    • 自动检测页面中的HLS流

4.3 处理常见问题

问题1:下载速度慢

  • 增加并发线程数:-w 16
  • 检查网络连接
  • 尝试使用代理

问题2:解密失败

  • 确认工具支持该加密方式
  • 检查密钥URL是否可访问

问题3:合并失败

  • 确认已安装FFmpeg并添加到PATH
  • 检查磁盘空间是否充足

五、高级功能与自定义

5.1 自定义请求头

有些网站需要特定的请求头才能访问,可以修改代码中的session配置:

self.session.headers.update({
    'User-Agent': '自定义User-Agent',
    'Referer': 'https://example.com',
    'Cookie': '你的cookie信息'
})

5.2 代理支持

添加代理支持可以绕过某些地域限制:

def __init__(self, m3u8_url, output_path="output.mp4", max_workers=8, proxy=None):
    # ...其他初始化代码
    if proxy:
        self.session.proxies = {
            'http': proxy,
            'https': proxy
        }

5.3 断点续传

通过记录已下载的片段,实现断点续传功能:

def resume_download(self, temp_dir):
    """检查并恢复下载"""
    downloaded_segments = []
    for file in os.listdir(temp_dir):
        if file.startswith("segment_") and file.endswith(".ts"):
            index = int(file.split('_')[1].split('.')[0])
            downloaded_segments.append(index)
    
    return downloaded_segments

六、法律与道德考量

6.1 合法使用

  • 仅下载你有权访问的内容
  • 尊重版权和知识产权
  • 不要绕过付费墙下载付费内容

6.2 教育目的

本工具主要用于教育和技术研究目的,帮助开发者理解:

  • HLS协议的工作原理
  • AES加密解密流程
  • 多线程网络编程
  • FFmpeg视频处理

七、完整代码获取

本文涉及的完整代码已经开源,可以通过以下方式获取:

#!/usr/bin/env python3
"""
HLS视频下载工具
支持加密HLS流下载并合并为MP4
"""

import os
import sys
import re
import subprocess
import requests
import tempfile
import shutil
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from Crypto.Cipher import AES
import argparse


class HLSDownloader:
    def __init__(self, m3u8_url, output_path="output.mp4", max_workers=8):
        self.m3u8_url = m3u8_url
        self.output_path = output_path
        self.max_workers = max_workers
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        
    def download_m3u8(self):
        """下载并解析m3u8文件"""
        try:
            response = self.session.get(self.m3u8_url, timeout=30)
            response.raise_for_status()
            return response.text
        except Exception as e:
            print(f"下载m3u8文件失败: {e}")
            return None

    def parse_m3u8(self, content):
        """解析m3u8文件内容"""
        lines = content.strip().split('\n')
        segments = []
        encryption_key = None
        encryption_iv = None
        
        base_url = '/'.join(self.m3u8_url.split('/')[:-1]) + '/'
        
        i = 0
        while i < len(lines):
            line = lines[i].strip()
            
            # 解析加密信息
            if line.startswith('#EXT-X-KEY:'):
                key_match = re.search(r'URI="([^"]+)"', line)
                iv_match = re.search(r'IV=([^,\s]+)', line)
                
                if key_match:
                    key_url = key_match.group(1)
                    if not key_url.startswith('http'):
                        key_url = urljoin(base_url, key_url)
                    encryption_key = key_url
                
                if iv_match:
                    encryption_iv = iv_match.group(1)
                    if encryption_iv.startswith('0x'):
                        encryption_iv = bytes.fromhex(encryption_iv[2:])
            
            # 解析视频片段
            elif line.startswith('#EXTINF:'):
                if i + 1 < len(lines):
                    segment_url = lines[i + 1].strip()
                    if not segment_url.startswith('http'):
                        segment_url = urljoin(base_url, segment_url)
                    segments.append(segment_url)
                    i += 1
            
            i += 1
        
        return segments, encryption_key, encryption_iv

    def download_key(self, key_url):
        """下载加密密钥"""
        try:
            response = self.session.get(key_url, timeout=30)
            response.raise_for_status()
            return response.content
        except Exception as e:
            print(f"下载密钥失败: {e}")
            return None

    def download_segment(self, segment_info):
        """下载单个视频片段"""
        segment_url, temp_dir, segment_index, key, iv = segment_info
        
        try:
            response = self.session.get(segment_url, timeout=60)
            response.raise_for_status()
            
            segment_data = response.content
            
            # 如果有加密,进行解密
            if key and iv:
                # 对于每个片段,IV可能需要递增
                if isinstance(iv, bytes):
                    segment_iv = bytearray(iv)
                    # 将segment_index写入IV的最后4个字节
                    segment_iv[-4:] = segment_index.to_bytes(4, 'big')
                    iv = bytes(segment_iv)
                
                cipher = AES.new(key, AES.MODE_CBC, iv)
                segment_data = cipher.decrypt(segment_data)
                
                # 移除PKCS7填充
                pad_len = segment_data[-1]
                if pad_len <= 16:
                    segment_data = segment_data[:-pad_len]
            
            # 保存片段文件
            segment_path = os.path.join(temp_dir, f"segment_{segment_index:06d}.ts")
            with open(segment_path, 'wb') as f:
                f.write(segment_data)
            
            return segment_path, None
            
        except Exception as e:
            return None, str(e)

    def merge_segments(self, segment_files, output_path):
        """使用ffmpeg合并视频片段"""
        # 创建文件列表
        list_file = "segments_list.txt"
        try:
            with open(list_file, 'w', encoding='utf-8') as f:
                for segment_file in segment_files:
                    # 使用绝对路径避免问题
                    abs_path = os.path.abspath(segment_file)
                    f.write(f"file '{abs_path}'\n")
            
            # 使用ffmpeg合并
            cmd = [
                'ffmpeg', '-f', 'concat', '-safe', '0', 
                '-i', list_file, '-c', 'copy', output_path, '-y'
            ]
            
            print("正在合并视频片段...")
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode == 0:
                print(f"✅ 视频已保存至: {output_path}")
                return True
            else:
                print(f"❌ FFmpeg错误: {result.stderr}")
                return False
                
        except Exception as e:
            print(f"❌ 合并失败: {e}")
            return False
        finally:
            # 清理文件列表
            if os.path.exists(list_file):
                os.remove(list_file)

    def download(self):
        """主下载流程"""
        print(f"🚀 开始下载HLS视频: {self.m3u8_url}")
        
        # 1. 下载m3u8文件
        print("📥 下载播放列表...")
        m3u8_content = self.download_m3u8()
        if not m3u8_content:
            print("❌ 无法获取播放列表")
            return False
        
        # 2. 解析m3u8文件
        segments, key_url, iv = self.parse_m3u8(m3u8_content)
        print(f"📄 发现 {len(segments)} 个视频片段")
        
        if not segments:
            print("❌ 未找到视频片段")
            return False
        
        # 3. 下载加密密钥
        key = None
        if key_url:
            print("🔐 下载加密密钥...")
            key = self.download_key(key_url)
            if not key:
                print("❌ 无法获取加密密钥")
                return False
            print(f"✅ 密钥获取成功 (长度: {len(key)} 字节)")
        
        # 4. 创建临时目录
        temp_dir = tempfile.mkdtemp()
        print(f"📁 临时目录: {temp_dir}")
        
        try:
            # 5. 并行下载所有片段
            print(f"⬇️  开始下载片段 (并发数: {self.max_workers})...")
            
            segment_tasks = [
                (url, temp_dir, idx, key, iv) 
                for idx, url in enumerate(segments)
            ]
            
            downloaded_segments = []
            failed_segments = []
            
            with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                # 提交所有下载任务
                future_to_index = {
                    executor.submit(self.download_segment, task): task[2]
                    for task in segment_tasks
                }
                
                # 使用进度条显示下载进度
                with tqdm(total=len(segments), desc="下载进度") as pbar:
                    for future in as_completed(future_to_index):
                        segment_path, error = future.result()
                        index = future_to_index[future]
                        
                        if segment_path:
                            downloaded_segments.append((index, segment_path))
                        else:
                            failed_segments.append((index, error))
                        
                        pbar.update(1)
            
            # 检查下载结果
            if failed_segments:
                print(f"⚠️  {len(failed_segments)} 个片段下载失败:")
                for idx, error in failed_segments:
                    print(f"  片段 {idx}: {error}")
            
            if not downloaded_segments:
                print("❌ 没有成功下载的片段")
                return False
            
            # 按索引排序片段
            downloaded_segments.sort(key=lambda x: x[0])
            segment_files = [path for _, path in downloaded_segments]
            
            print(f"✅ 成功下载 {len(segment_files)} 个片段")
            
            # 6. 合并视频
            success = self.merge_segments(segment_files, self.output_path)
            
            return success
            
        finally:
            # 清理临时目录
            try:
                shutil.rmtree(temp_dir)
                print("🧹 已清理临时文件")
            except:
                pass


def main():
    parser = argparse.ArgumentParser(description="HLS视频下载工具")
    parser.add_argument("url", help="HLS播放列表URL (.m3u8)")
    parser.add_argument("-o", "--output", default="video.mp4", help="输出文件路径")
    parser.add_argument("-w", "--workers", type=int, default=8, help="并发下载数")
    
    args = parser.parse_args()
    
    # 检查ffmpeg
    try:
        subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
    except (subprocess.CalledProcessError, FileNotFoundError):
        print("❌ 未找到ffmpeg,请先安装ffmpeg")
        print("macOS: brew install ffmpeg")
        print("Ubuntu: sudo apt install ffmpeg")
        print("Windows: https://ffmpeg.org/download.html")
        sys.exit(1)
    
    # 创建下载器并开始下载
    downloader = HLSDownloader(args.url, args.output, args.workers)
    success = downloader.download()
    
    sys.exit(0 if success else 1)


if __name__ == "__main__":
    main()

结语

通过本文,我们深入探讨了HLS视频下载工具的开发过程,从协议解析到加密处理,从多线程下载到视频合并。这个工具不仅实用,而且是一个很好的学习项目,涵盖了网络编程、加密解密、多线程处理和视频处理等多个技术领域。

希望这个工具和文章对你有所帮助,如果有任何问题或建议,欢迎在评论区讨论!

免责声明:请仅在合法范围内使用此工具,尊重内容创作者的版权和劳动成果。

复制全文 生成海报 视频下载 编程 流媒体 技术教程 Python

推荐文章

Vue3中的虚拟滚动有哪些改进?
2024-11-18 23:58:18 +0800 CST
纯CSS绘制iPhoneX的外观
2024-11-19 06:39:43 +0800 CST
Vue3中的JSX有什么不同?
2024-11-18 16:18:49 +0800 CST
Vue3 vue-office 插件实现 Word 预览
2024-11-19 02:19:34 +0800 CST
paint-board:趣味性艺术画板
2024-11-19 07:43:41 +0800 CST
纯CSS实现3D云动画效果
2024-11-18 18:48:05 +0800 CST
FcDesigner:低代码表单设计平台
2024-11-19 03:50:18 +0800 CST
Go语言SQL操作实战
2024-11-18 19:30:51 +0800 CST
软件定制开发流程
2024-11-19 05:52:28 +0800 CST
向满屏的 Import 语句说再见!
2024-11-18 12:20:51 +0800 CST
Elasticsearch 文档操作
2024-11-18 12:36:01 +0800 CST
Nginx 状态监控与日志分析
2024-11-19 09:36:18 +0800 CST
Go 如何做好缓存
2024-11-18 13:33:37 +0800 CST
Go 并发利器 WaitGroup
2024-11-19 02:51:18 +0800 CST
跟着 IP 地址,我能找到你家不?
2024-11-18 12:12:54 +0800 CST
js常用通用函数
2024-11-17 05:57:52 +0800 CST
程序员茄子在线接单