Polars + Apache Arrow + DuckDB:现代列式数据处理三驾马车——从 Rust 引擎到嵌入式 OLAP 的生产级完全指南(2026)
一、背景:数据分析的「中间地带」困局
如果你在过去五年里做过任何像样的数据分析工作,大概率经历过这样的场景:
import pandas as pd
df = pd.read_csv("sales_2025.csv")
# 优雅地写了 30 行代码
# 然后……
# MemoryError: Unable to allocate 4.32 GiB for an array with shape (12345678,)
或者更常见的:
# 数据只有 500 万行,GROUP BY 耗时 47 秒
# 你开始怀疑人生,跑去泡了杯咖啡
让我直说:pandas 在 100 万行以下确实好用,一旦数据量级上了千万行,它的内存模型就是灾难。
这个问题不是你的错,是 pandas 的架构决定的:
- 行式存储:DataFrame 的每一列在底层是 NumPy 数组或 Python 对象数组,但 pandas 的操作模式本质上是围绕行索引设计的
- 即时求值(Eager Execution):每写一行代码,pandas 立刻执行,无法做全局优化
- 单线程:pandas 默认单核执行(除非你用
pandas 2.0+的pyarrow后端,但那是另一个故事了) - 内存占用爆炸:
df.copy()就是两倍内存,merge更是内存黑洞
与此同时,传统 OLAP 数据库(ClickHouse、StarRocks)虽然快,但部署成本高——你需要服务器、集群、运维。对于「我就跑个分析出个报告」的场景,太重了。
这就形成了一道中间地带:数据量超出 Excel 上限(100 万行)和 pandas 舒适区(500 万行),但又不值得为一次分析任务部署一个 ClickHouse 集群。
2024-2026 年,一个由 Apache Arrow → Polars → DuckDB 组成的现代列式数据处理栈,精准地填上了这个缺口。它们共享同一套底层数据格式、互补的使用场景,而且每个工具都是零配置、嵌入式的——装个包就能用,不需要服务器。
二、底层基石:Apache Arrow 的列式内存格式
在讨论 Polars 和 DuckDB 之前,必须先理解 Apache Arrow。它们是 Arrow 生态上长出来的两棵大树。
2.1 为什么列式格式是性能关键?
假设你有一个 100 列、1000 万行的表,你要计算所有行的 salary 平均值。
行式存储(pandas 默认):数据在内存中按行排列,CPU 需要跳过大量的无关列数据才能读到 salary。每一行的 name、address、phone 等字段都夹在 salary 之间,导致缓存行(cache line)利用率极低——可能只用到 1% 的数据。
列式存储(Arrow):salary 列的所有值连续存放在内存中。CPU 从 L1/L2 缓存里连续读取 1000 个浮点数,SIMD 指令一次性处理多个值,cache miss 大幅减少。
# 行式存储(示意)
Row1: [name(40B) | salary(8B) | address(100B) | phone(15B)]
Row2: [name(40B) | salary(8B) | address(100B) | phone(15B)]
# 读取 salary 时要跳过 155 字节的无关数据
# 列式存储(Arrow)
salary: [1.0, 2.0, 3.0, ...] # 连续 8 字节 * N 行
name: ["Alice", "Bob", ...] # 连续
# 读取 salary 时 CPU 完美预取
2.2 Arrow 的核心设计
Apache Arrow 不仅仅是一个列式格式,它还定义了一套完整的规范:
1. 分层内存布局
Arrow Array 结构
┌─────────────────────┐
│ validity bitmap │ ← 处理 NULL 值,每位表示一个元素是否有效
├─────────────────────┤
│ data buffer │ ← 实际的列数据(连续排列)
├─────────────────────┤
│ offset buffer │ ← 仅用于可变长度类型(string、binary)
└─────────────────────┘
每个 Arrow Array 由零个或多个 Buffer 组成。Buffer 是一段连续的内存。这种设计使得 Arrow 可以实现零拷贝共享——多个进程读取同一块内存映射文件,不需要序列化/反序列化。
2. 丰富的数据类型
Arrow 原生支持 40+ 种数据类型,比 JSON 和传统数据库更丰富:
| 类别 | 类型 |
|---|---|
| 基础数值 | Int8 Int16 Int32 Int64 UInt8 UInt16 UInt32 UInt64 Float16 Float32 Float64 |
| 时间 | Date32 Date64 Time32 Time64 Timestamp Duration Interval |
| 复合 | List FixedSizeList Struct Map Union Dictionary |
| 特殊 | Decimal128 Decimal256 Binary LargeBinary String LargeString |
3. 零拷贝反序列化
Arrow 的神奇之处在于:数据从磁盘(Parquet)读到内存时,内存中的二进制布局和 Arrow 的内存格式一模一样。这意味着:
# CSV → pandas:需要解析字符串、分配对象、逐行填充(开销极大)
# Parquet → Arrow:直接内存映射,0 行 Python 代码用于解析
这也是为什么 pd.read_parquet() 比 pd.read_csv() 快 10-50 倍的原因。
2.3 Arrow IPC Flight:列式数据的 gRPC
Arrow 还定义了 Flight RPC 协议——一个基于 gRPC 的高吞吐数据传输协议:
service FlightService {
rpc GetSchema(Command) returns (SchemaResult);
rpc DoGet(Ticket) returns (stream FlightData);
rpc DoPut(PutResult) returns (stream FlightData);
rpc DoExchange(stream FlightData) returns (stream FlightData);
}
Flight 的核心价值在于:不同系统之间传输大规模列式数据时,不需要先序列化成 JSON 或 Protobuf,而是直接传输 Arrow 的内存块。在跨进程、跨节点的场景下,Flight 的吞吐量比 gRPC+Protobuf 高 10-20 倍。
三、Polars:Rust 实现的、程序员友好的 DataFrame 库
3.1 Polars 的架构哲学
Polars 的诞生源于一个简单但有力的想法:DataFrame 库不应该用 Python 写。
pandas 的核心逻辑用 Cython/C 写了,但调度层、API 层、类型检查层都在 Python 里。这意味着:
- 每调用一次
df.groupby("col").sum(),Python 解释器要执行几十个字节码指令才能进入 C 层 - 链式调用时中间结果反复在 Python 对象和 C 数组之间转换
- GIL 限制了多线程并行
Polars 的架构完全绕开了这些问题:
┌─────────────────────────────────────┐
│ Python API Layer │ ← PyO3 绑定,薄薄一层
├─────────────────────────────────────┤
│ Rust 表达式系统 │ ← 类型擦除 trait 对象
├─────────────────────────────────────┤
│ 查询优化器 (优化规则) │ ← 谓词下推、投影消除、Join 重排
├─────────────────────────────────────┤
│ 列式执行引擎 (向量化 + Morsel) │ ← 多线程并行 + SIMD
├─────────────────────────────────────┤
│ Apache Arrow 内存格式 │ ← 底层数据表示
└─────────────────────────────────────┘
关键点:整个数据流的瓶颈在 Rust 层,Python 只负责传参数和收结果。
3.2 表达式系统 vs 方法链
Polars 最被低估的设计是它的表达式系统(Expression System)。
pandas 的链式调用长这样:
result = (
df
.groupby("department")
.agg({"salary": "mean", "age": "max"})
.reset_index()
.rename(columns={"salary": "avg_salary"})
)
Polars 用表达式表达同样的逻辑:
import polars as pl
result = (
df
.group_by("department")
.agg([
pl.col("salary").mean().alias("avg_salary"),
pl.col("age").max().alias("max_age"),
pl.col("score").quantile(0.9).alias("p90_score"),
])
)
看似差异不大,但底层区别很大:
- Polars 的
pl.col()返回的不是数据,而是一个表达式对象 - 把这个表达式传给
agg()后,Polars 的优化器会重写整个表达式树 - 例如
pl.col("salary").mean().alias("avg_salary")→ 优化器会识别出只需要salary列,忽略其他列
再看一个更复杂的例子——Polars 的条件表达式:
# Pandas 写法
df["category"] = df["value"].apply(
lambda x: "high" if x > 100 else ("medium" if x > 50 else "low")
)
# Polars 写法
df.with_columns(
pl.when(pl.col("value") > 100)
.then(pl.lit("high"))
.when(pl.col("value") > 50)
.then(pl.lit("medium"))
.otherwise(pl.lit("low"))
.alias("category")
)
Polars 的 when/then/otherwise 在底层编译成列式向量化操作,而不是逐行应用。在 1000 万行数据上,Polars 比 pandas 的 apply 快 50-100 倍。
3.3 惰性求值(Lazy API):查询优化的核心
Polars 的 Lazy API 才是真正的杀手锏。它把 DataFrame 操作变成查询计划(Query Plan),然后让优化器重写这个计划:
# 惰性模式
q = (
pl.scan_csv("sales_2025.csv") # 不加载数据,只创建逻辑计划
.filter(pl.col("amount") > 1000)
.group_by("region")
.agg([
pl.col("amount").sum().alias("total"),
pl.col("count").mean().alias("avg_count"),
])
.sort("total", descending=True)
)
# 查看优化后的计划
print(q.explain(optimized=True))
# 真正执行
result = q.collect()
q.explain(optimized=True) 的输出会揭示优化器做了什么:
--- 优化后的计划 ---
SORT BY [total DESC]
AGGREGATE
GROUP BY [region]
AGG: [sum(amount) AS total, mean(count) AS avg_count]
CSV SCAN [sales_2025.csv] PROJECT: [3/10, 5/10]
│
↓ 优化器干的事:
1. 谓词下推:filter→ 下推到 scan 阶段
2. 投影消除:原表 10 列只用 3 列(region, amount, count)
3. 行筛选:只读 amount > 1000 的行
对比 pandas 的即时求值模式:
# pandas- 每行代码都耗内存
df = pd.read_csv("sales_2025.csv") # 全部加载到内存
df = df[df["amount"] > 1000] # 过滤,创建新 DataFrame
result = df.groupby("region").agg(...) # 聚合,又一个新 DataFrame
result = result.sort_values("total") # 排序
pandas 产生 3-4 个中间 DataFrame,每个都完整分配内存;Polars Lazy 只产生一个最终结果。
3.4 流式处理:超越内存限制
Polars 还有一个被低估的功能:流式模式(Streaming)。
当数据集大于内存时,普通 DataFrame 库会直接崩溃。Polars 的流式模式可以将数据分块处理,每批只处理一小部分:
q = (
pl.scan_csv("extremely_large_file.csv")
.group_by("category")
.agg([
pl.col("value").sum(),
pl.col("count").mean(),
])
)
# 流式模式:数据分批处理,每批 50000 行
result = q.collect(streaming=True)
流式模式下,Polars 将聚合拆分为局部聚合 + 全局合并两个阶段:
┌──────────┐ ┌───────────┐ ┌───────────┐
│ 批次 1 │──→│ 局部聚合 │──→┤ │
├──────────┤ ├───────────┤ │ 全局合并 │──→ 最终结果
│ 批次 2 │──→│ 局部聚合 │──→│ │
├──────────┤ ├───────────┤ └───────────┘
│ 批次 N │──→│ 局部聚合 │──→│
└──────────┘ └───────────┘
对 10 亿行的日志文件做 GROUP BY,Polars Streaming 只需要几百 MB 内存就能完成,而 pandas 连打开文件都做不到。
3.5 性能基准
不是我说的,社区实测数据(ClickBench, H2O.ai benchmark):
| 操作 | pandas 2.2 | Polars 1.0 | 加速比 |
|---|---|---|---|
| GROUP BY 1 列(1 亿行) | 38.2s | 2.1s | 18x |
| 多列 GROUP BY(1 亿行) | 崩溃 | 4.8s | ∞ |
| 10 列 JOIN(5000 万行 × 2) | 127s | 6.3s | 20x |
| CSV 读取 + 过滤(2 亿行) | 89s | 5.7s | 15.6x |
| 窗口函数(1 亿行) | 崩溃 | 3.9s | ∞ |
注:测试环境为 AMD Ryzen 9 7950X(16 核),128GB RAM,NVMe SSD。
3.6 实战:从 pandas 迁移到 Polars
迁移过程不必一次性全部重写。推荐逐步迁移策略:
第一步:替换 I/O 操作
# 原来
df = pd.read_csv("data.csv", usecols=["a", "b", "c"])
# 现在(读取更快,内存更少)
df = pl.read_csv("data.csv", columns=["a", "b", "c"])
第二步:核心逻辑重写
# pandas 版本
def process_sales(df: pd.DataFrame) -> pd.DataFrame:
df = df[df["status"] == "completed"]
df["total"] = df["price"] * df["quantity"]
monthly = df.groupby(pd.Grouper(key="date", freq="M")).agg({
"total": "sum",
"order_id": "nunique"
}).reset_index()
monthly["yoy_growth"] = (monthly["total"] - monthly["total"].shift(12)) / monthly["total"].shift(12)
return monthly
# Polars 版本
def process_sales_polars(df: pl.DataFrame) -> pl.DataFrame:
return (
df
.filter(pl.col("status") == "completed")
.with_columns((pl.col("price") * pl.col("quantity")).alias("total"))
.group_by_dynamic("date", every="1mo")
.agg([
pl.col("total").sum(),
pl.col("order_id").n_unique(),
])
.with_columns(
((pl.col("total") - pl.col("total").shift(12)) / pl.col("total").shift(12))
.alias("yoy_growth")
)
)
第三步:混合使用(过渡期最佳实践)
Polars 可以零拷贝转换为 pandas:
# Polars → pandas(零拷贝)
pandas_df = polars_df.to_pandas(use_pyarrow_extension=True)
# pandas → Polars(也是零拷贝)
polars_df = pl.from_pandas(pandas_df)
这在逐步迁移时非常有价值——旧代码继续用 pandas,新代码用 Polars,交接时零成本。
四、DuckDB:嵌入式 OLAP 数据库
如果说 Polars 是更快的 DataFrame,那 DuckDB 就是可以在内存里跑的 ClickHouse。
4.1 DuckDB 的定位
DuckDB 的创造者 Hannes Mühleisen 说得很清楚:
"SQLite 是 OLTP 的嵌入式数据库,DuckDB 是 OLAP 的嵌入式数据库。"
核心设计决策:
| 维度 | SQLite | DuckDB |
|---|---|---|
| 存储 | 行式(Row-Oriented) | 列式(Column-Oriented) |
| 执行 | 逐行迭代模型(Volcano) | 向量化执行(Vectorized) |
| 并发 | 写锁串行化 | 多版本并发控制(MVCC) |
| 优化器 | Heuristic | Cost-based + Statistics |
| 适用场景 | 事务、CRUD | 分析、聚合、报表 |
| 部署 | 嵌入式,单文件 | 嵌入式,单文件 |
DuckDB 没有守护进程、没有端口、没有配置——pip install duckdb 之后就能用,跟 SQLite 一样简单,但跑分析查询比 SQLite 快 100-10000 倍。
4.2 向量化执行引擎
DuckDB 之所以快,核心在于它的向量化执行模型。
传统数据库(包括 SQLite 和 PostgreSQL 的火山模型)逐行处理数据:
逐行执行模型(Volcano Model)
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 全表扫描 │──→│ 过滤 │──→│ 聚合 │
└──────────┘ └──────────┘ └──────────┘
每行:一行进 → 判断 → 一行出(重复 1 亿次)
DuckDB 的向量化执行:
向量化执行模型
┌───────────┐ ┌───────────┐ ┌───────────┐
│ 全表扫描 │──→│ Vector Filter │──→│ Vector Aggregate │
└───────────┘ └───────────┘ └───────────┘
每个批次 2048 行 → 向量化判断 → SIMD 批量处理
批处理(Vector at a time):DuckDB 一次处理一个固定大小的向量(默认 2048 行),而不是一行。这带来几个好处:
- 函数调用开销降低 2048 倍:每次
Next()返回 2048 行,而不是 1 行 - CPU 缓存友好:2048 个浮点数可以完整装入 L1 缓存
- SIMD 向量化:对连续的内存块做 AVX2/AVX-512 指令,一次处理 8/16 个值
4.3 Morsel-Driven Parallelism
DuckDB 的并行执行采用 Morsel-Driven 模型:
┌─────────────────────────────────────┐
│ 查询优化器 │
│ (生成逻辑计划 → 物理计划) │
└──────────┬──────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 并行调度器 (Pipeline) │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Pipeline 0 │ │ Pipeline 1 │ │ Pipeline 2 │ │
│ └────────┘ └────────┘ └────────┘ │
└──────────┬──────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Morsel 分片 (2048 行/片) │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Worker 1│ │Worker 2│ │Worker 3│ │ ← 线程池
│ └────────┘ └────────┘ └────────┘ │
│ 每个 Worker 从全局队列中取一个 Morsel │
└─────────────────────────────────────┘
关键点:
- 调度器将表扫描操作拆分为多个 Morsel(数据切片)
- Morsel 队列是线程安全的,空闲 Worker 自动取下一个 Morsel
- Pipeline Breaker(如 Hash Join Probe 端)会自动同步
- 自动适配 CPU 核心数,不需要配置
这与 Spark 的任务调度类似,但 DuckDB 是进程内的,没有网络和序列化开销。
4.4 零配置 SQL 查询
DuckDB 最实用的功能之一是直接对文件执行 SQL 查询——不需要 CREATE TABLE、不需要 INSERT、不需要 ETL:
-- 直接查询 CSV 文件
SELECT department, AVG(salary) as avg_salary
FROM 'employees.csv'
WHERE join_date > '2024-01-01'
GROUP BY department
ORDER BY avg_salary DESC;
-- 多文件 Glob 模式
SELECT year, COUNT(*) as crimes
FROM 'nyc_crime_data_*.parquet'
WHERE borough = 'Manhattan'
GROUP BY year
ORDER BY year;
-- 跨格式 JOIN
SELECT c.name, o.total
FROM read_csv('customers.csv') c
JOIN read_parquet('orders.parquet') o
ON c.id = o.customer_id;
在 Python 中使用:
import duckdb
# 直接对 CSV 执行 SQL,返回 pandas DataFrame
result = duckdb.sql("""
SELECT
department,
AVG(salary) as avg_salary,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY salary) as median_salary,
COUNT(*) as employee_count
FROM 'employees.csv'
WHERE salary > 0
GROUP BY department
HAVING COUNT(*) > 10
ORDER BY avg_salary DESC
""").df()
print(result)
这段代码发生了什么:
- DuckDB 的 CSV 读取器自动推断所有列的类型(不需要 schema 定义)
- SQL 解析器将查询编译为逻辑计划
- 优化器进行谓词下推、投影消除、Join 重排
- 物理计划生成向量化执行代码
- 多线程并行扫描和聚合数据
- 结果以 Arrow 格式零拷贝转换为 pandas DataFrame
整个过程 5 行 Python 代码,不需要数据库服务器,不需要配置。
4.5 特色 SQL 功能
DuckDB 有一些非常实用的 SQL 扩展:
QUALIFY — 窗口函数过滤(不用嵌套子查询):
-- 找到每个部门薪资前三的员工
SELECT name, department, salary
FROM employees
QUALIFY ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) <= 3;
ASOF JOIN — 时序不等值最近匹配:
-- 找到每个交易发生时最近的汇率
SELECT t.transaction_id, t.amount * r.rate as converted_amount
FROM transactions t
ASOF JOIN exchange_rates r
ON t.currency = r.currency
AND t.timestamp >= r.timestamp;
PIVOT / UNPIVOT:
-- 行列转换,一行搞定
PIVOT sales_data
ON product_category
USING SUM(revenue)
GROUP BY year;
-- 传统的 CASE WHEN 方式要写十几个条件
直接查询 S3/HTTP 上的 Parquet 文件:
-- 不需要下载,直接分析云上数据
SELECT region, SUM(revenue)
FROM read_parquet('s3://my-bucket/sales/*.parquet')
WHERE date >= '2025-01-01'
GROUP BY region;
4.6 DuckDB + Polars 的双引擎模式
Polars 和 DuckDB 不是竞争关系,而是互补的。最佳实践是让它们各司其职:
flowchart LR
A[数据源<br/>CSV/Parquet/JSON] --> B{DuckDB}
B --> C[SQL 批处理<br/>聚合、过滤、Join]
C --> D[Arrow 格式结果]
D --> E[Polars]
E --> F[Python 逻辑<br/>特征工程、清洗]
F --> G[结果输出]
Python 中的双引擎协作:
import duckdb
import polars as pl
# 1. DuckDB 负责 SQL 批处理(擅长复杂 JOIN 和大规模聚合)
result_arrow = duckdb.sql("""
SELECT
u.user_id,
u.region,
SUM(o.amount) as lifetime_value,
COUNT(DISTINCT o.order_id) as order_count,
AVG(o.amount) as avg_order_value
FROM read_parquet('users/*.parquet') u
JOIN read_parquet('orders/*.parquet') o
ON u.user_id = o.user_id
WHERE o.order_date >= '2024-01-01'
GROUP BY u.user_id, u.region
""").arrow() # 返回 Arrow Table!零拷贝
# 2. Polars 负责 Python 层的复杂逻辑
df = pl.from_arrow(result_arrow)
result = (
df
.with_columns(
(pl.col("lifetime_value") / pl.col("order_count")).alias("avg_order_value"),
pl.when(pl.col("lifetime_value") > pl.col("lifetime_value").quantile(0.9))
.then(pl.lit("VIP"))
.otherwise(pl.lit("Normal"))
.alias("tier")
)
.sort("lifetime_value", descending=True)
)
为什么这不是冗余?
| 操作 | 最佳引擎 | 原因 |
|---|---|---|
| 大规模 JOIN(10 亿 × 10 亿) | DuckDB | Hash Join 用 C++ 实现,比 Polars 的 Rust 实现更成熟 |
| 复杂特征工程 | Polars | Python 表达能力强,Polars 表达式系统比 SQL 更灵活 |
| 流式数据管道 | Polars | 原生 streaming 模式支持无限流 |
| 即席 SQL 分析 | DuckDB | 分析师写 SQL 比写 Python 快 |
| 时间序列窗口函数 | DuckDB | QUALIFY + ASOF JOIN 是杀手级功能 |
| 机器学习前处理 | Polars | 直接输出 numpy/pytorch 兼容的数组 |
五、性能优化实战
5.1 Parquet:列式存储的终极形态
Parquet 是 Arrow 格式的"存档版"——同一套列式设计,但做了磁盘压缩和编码优化。
Parquet 文件结构:
Parquet File
┌─────────────────────────────────────────┐
│ Magic Number │ ← "PAR1" 标识
├─────────────────────────────────────────┤
│ Row Group 1 │
│ ┌──────────┬──────────┬──────────────┐ │
│ │ Col 1 │ Col 2 │ Col 3 │ │
│ │ (Page) │ (Page) │ (Page) │ │
│ │ ┌─────┐ │ ┌─────┐ │ ┌─────┐ │ │
│ │ │ RLE │ │ │Dict │ │ │ Delta│ │ │
│ │ └─────┘ │ └─────┘ │ └─────┘ │ │
│ └──────────┴──────────┴──────────────┘ │
├─────────────────────────────────────────┤
│ Row Group 2 │
│ ┌──────────┬──────────┬──────────────┐ │
│ │ ... │ ... │ ... │ │
│ └──────────┴──────────┴──────────────┘ │
├─────────────────────────────────────────┤
│ Footer │
│ (Schema + 各列的位置 + Row Group 元数据) │
│ ┌─────────────────────────────────────┐│
│ │ 索引层 ││
│ │ - Column Chunk 1: offset=4096 ││
│ │ - Column Chunk 2: offset=1048576 ││
│ │ - 各列的统计信息:min/max/null_count ││
│ └─────────────────────────────────────┘│
├─────────────────────────────────────────┤
│ Magic Number │ ← "PAR1" 标识
└─────────────────────────────────────────┘
Parquet 的核心优势:
- 谓词下推(Predicate Pushdown):读取时直接跳过不需要的 Row Group
- 列裁剪(Column Projection):只读取查询中涉及的列
- 数据编码压缩:RLE(游程编码)、Dictionary(字典编码)、Delta(增量编码)
- 统计信息过滤:每个 Column Chunk 的 min/max 值用于跳过
在 DuckDB 中,谓词下推的效果非常明显:
-- DuckDB 对 Parquet 文件的智能读取
SELECT * FROM 'sales.parquet'
WHERE region = 'APAC' AND date >= '2025-01-01';
DuckDB 会:
- 读取 Footer,获取每个 Row Group 的
region列的 min/max - 跳过不含
APAC的 Row Group - 对于命中的 Row Group,只读取
region和date列做精确过滤 - 最终只传输匹配行的所有列
实验数据:一个包含 100 个 Row Group(每个 100 万行)、50 列的 Parquet 文件,查询单列聚合——经过谓词下推后,实际读取的数据量从 5GB 降到 80MB。
5.2 内存模型对比与优化
pandas 的内存黑洞:
# 看起来人畜无害
df = pd.read_csv("data.csv") # 分配 2GB
df_filtered = df[df["status"] == "ok"] # 又分配 2GB
df_grouped = df_filtered.groupby("x").sum() # 再分配 1GB
# 峰值内存 = 2 + 2 + 1 = 5GB(虽然最终结果只需要 500MB)
Polars Lazy 的内存优雅:
# Polars:查询优化器只在最后一步分配内存
q = (
pl.scan_csv("data.csv")
.filter(pl.col("status") == "ok")
.group_by("x")
.agg(pl.col("y").sum())
)
result = q.collect()
# 峰值内存 ≈ 最终结果大小(约 500MB)+ 单个批处理缓冲区(约 200MB)
DuckDB 的内存管控:
DuckDB 内置物化内存限制:
import duckdb
# 限制 DuckDB 最大内存使用 2GB
duckdb.sql("SET memory_limit = '2GB'")
# 当查询需要更多内存时,DuckDB 会自动将中间结果 spill 到磁盘
# 而不是 OOM
5.3 索引:列式数据库的不同思路
关系数据库依赖 B-Tree 索引来加速查询,但列式系统走了一条不同的路:
DuckDB 使用统计信息(Zone Maps)和自适应分区:
传统 B-Tree 索引:精确导航到某一行
→ 适合 OLTP:WHERE id = 42
Zone Map(区域索引):跳过完全不符合的分区
→ 适合 OLAP:WHERE salary BETWEEN 50000 AND 100000
DuckDB 的 Zone Map 在文件层面存储每个 Row Group 的列统计信息,完全不需要显式创建索引。
Polars 的策略更偏向计算时的自适应优化:
# Polars 自动决定最佳算法
df.sort("column")
# 小数据 → 快速排序
# 大数据 → 基数排序/并行归并排序
# 已排序数据 → 检测并跳过排序
5.4 实战性能调优 checklist
对于 Polars:
- 总是用 Lazy API:
scan_csv/scan_parquet而非read_csv/read_parquet - 早过滤、晚聚合:
filter()尽可能靠前 - 只选需要的列:
select()或group_by时明确列名 - 利用流式模式:超大数据集用
collect(streaming=True) - 数据分区:大量小文件用
scan_csv的include_file_paths参数 - 使用
explain():经常检查优化后的查询计划
# 优化反例
pl.read_csv("*.csv").select(["a", "b"]).filter(pl.col("a") > 10)
# 优化正例
pl.scan_csv("*.csv", include_file_paths=True)\
.filter(pl.col("a") > 10)\
.select(["b"])\
.collect()
对于 DuckDB:
- 减少数据类型宽度:能用
INT32不用INT64,能用FLOAT32不用FLOAT64 - Parquet 分区:按高频过滤字段分区存储
- 设置
memory_limit:为每个查询设置合理上限 - 使用
EXPLAIN ANALYZE:查看瓶颈 - 表分区列作为过滤条件:利用分区剪枝
-- DuckDB 查询分析
EXPLAIN ANALYZE
SELECT region, SUM(amount)
FROM 'sales.parquet'
WHERE date >= '2025-01-01'
GROUP BY region;
六、生产级最佳实践
6.1 数据管道架构
以下是一个生产级的数据管道设计,组合了 DuckDB、Polars 和 Arrow:
# ─── 第一层:数据摄取 ───
# DuckDB 作为 ETL 引擎处理原始数据
import duckdb
import polars as pl
def ingest_and_transform():
"""从多个数据源拉取并预处理"""
# 历史数据(每日增量)
duckdb.sql("""
CREATE OR REPLACE TABLE daily_sales AS
SELECT * FROM read_parquet(
's3://data-lake/raw/sales/*/date={date}/*.parquet',
hive_partitioning=true
)
WHERE date >= current_date - INTERVAL '1 day';
""")
# 清洗和合并
duckdb.sql("""
CREATE OR REPLACE TABLE clean_sales AS
SELECT
transaction_id,
product_id,
customer_id,
COALESCE(amount, 0) as amount,
COALESCE(quantity, 1) as quantity,
date,
CASE
WHEN region IS NULL THEN 'unknown'
ELSE UPPER(region)
END as region
FROM daily_sales
WHERE transaction_id IS NOT NULL
""")
return duckdb.table("clean_sales").arrow()
# ─── 第二层:特征工程 ───
# Polars 处理复杂的 Python 层逻辑
def feature_engineering(arrow_table):
df = pl.from_arrow(arrow_table)
return (
df
.with_columns([
(pl.col("amount") / pl.col("quantity")).alias("unit_price"),
pl.col("date").dt.month().alias("month"),
pl.col("date").dt.weekday().alias("day_of_week"),
])
.with_columns(
pl.col("unit_price").rolling_mean(
window_size=7, min_periods=1
).over("product_id").alias("price_ma_7d")
)
)
# ─── 第三层:结果存储 ───
# DuckDB 将结果写回 Parquet
def store_results(df: pl.DataFrame):
arrow_table = df.to_arrow()
duckdb.sql("""
COPY (
SELECT * FROM arrow_table
) TO 's3://data-lake/curated/sales/enriched/'
(FORMAT PARQUET, PARTITION_BY (region, month))
""")
6.2 大规模数据场景
场景:十亿级日志分析
# 10 亿行 Nginx 日志 → 多维度聚合
import duckdb
duckdb.sql("""
SELECT
status_code,
COUNT(*) as request_count,
AVG(response_time) as avg_time,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY response_time) as p99_time,
COUNT(DISTINCT ip_address) as unique_visitors
FROM read_parquet('s3://logs/nginx/*.parquet')
WHERE timestamp BETWEEN '2025-06-01' AND '2025-06-30'
GROUP BY status_code
ORDER BY request_count DESC
""")
DuckDB 的执行策略:
- 分区剪枝:跳过 6 月之前的所有 Parquet 分区
- 列裁剪:只读取
status_code、response_time、ip_address、timestamp四列 - 哈希聚合:
GROUP BY status_code因为是低基数,用 Hash Aggregation - 分位数计算:
PERCENTILE_CONT用 T-Digest 近似算法 - 结果:10 亿行原始数据 → 6 行聚合结果,耗时约 12 秒
场景:ClickStream 漏斗分析
-- DuckDB 实现用户行为漏斗
WITH funnel_steps AS (
SELECT
user_id,
timestamp,
event_type,
FIRST_VALUE(event_type) OVER (
PARTITION BY session_id ORDER BY timestamp
) as first_event,
LAST_VALUE(event_type) OVER (
PARTITION BY session_id ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as last_event
FROM read_parquet('events*.parquet')
WHERE event_type IN ('page_view', 'add_cart', 'checkout', 'payment')
)
SELECT
COUNT(DISTINCT CASE WHEN first_event = 'page_view' THEN user_id END) as step_1,
COUNT(DISTINCT CASE WHEN last_event != 'page_view' THEN user_id END) as step_2,
COUNT(DISTINCT CASE WHEN last_event = 'checkout' THEN user_id END) as step_3,
COUNT(DISTINCT CASE WHEN last_event = 'payment' THEN user_id END) as step_4
FROM funnel_steps;
6.3 CI/CD 集成
Polars + DuckDB 的组合非常适合做数据管道的测试:
# test_data_pipeline.py
import duckdb
import polars as pl
def test_pipeline():
# 用 DuckDB 的内存数据库作为测试数据
duckdb.sql("""
CREATE TABLE test_data AS
SELECT * FROM (VALUES
(1, 'Alice', 50000, 'Engineering'),
(2, 'Bob', 60000, 'Sales'),
(3, 'Charlie', 70000, 'Engineering'),
(4, 'David', 55000, 'Sales')
) AS t(id, name, salary, department)
""")
# 用 Polars 处理
result = (
pl.from_arrow(duckdb.table("test_data").arrow())
.group_by("department")
.agg(pl.col("salary").mean().alias("avg_salary"))
.sort("avg_salary", descending=True)
)
assert result["avg_salary"][0] == 65000 # Engineering
assert result["avg_salary"][1] == 57500 # Sales
print("✅ 数据管道测试通过")
6.4 与可见性/监控系统集成
DuckDB 可以优雅地嵌入到观测性系统中:
# OpenTelemetry + DuckDB 本地分析
from opentelemetry import metrics
import duckdb
class DuckDBMetricsExporter:
"""将指标导出到 DuckDB 进行本地分析"""
def export(self, metrics_data):
for metric in metrics_data:
duckdb.sql("""
INSERT INTO metrics_history
SELECT * FROM read_parquet(?)
""", parameters=[metric.serialize_to_parquet()])
def query_latency_breakdown(self, service, duration_minutes=60):
return duckdb.sql(f"""
SELECT
percentile_cont(0.5) WITHIN GROUP (ORDER BY latency_ms) as p50,
percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) as p99,
COUNT(*) as total_requests
FROM metrics_history
WHERE service = '{service}'
AND timestamp >= NOW() - INTERVAL '{duration_minutes} minutes'
""").fetchone()
七、Roadmap 与未来展望
7.1 Arrow 生态的快速扩张
Apache Arrow 已经从最初的内存格式,演变为一个庞大的生态系统:
- Arrow Flight SQL:通过 Arrow Flight 协议直接执行 SQL,PostgreSQL/ClickHouse 都开始支持
- Substrait:一个跨引擎的查询计划交换格式,让 DuckDB、Spark、DataFusion 之间共享优化计划
- ADBC (Arrow Database Connectivity):下一代数据库连接标准,比 JDBC/ODBC 快 10-100 倍
# ADBC 连接 PostgreSQL,数据全程 Arrow 格式(零转换)
import adbc_driver_postgresql
conn = adbc_driver_postgresql.connect("postgres://...")
result = conn.execute("""
SELECT date, SUM(revenue)
FROM sales
WHERE region = 'APAC'
GROUP BY date
""")
# result 是 Arrow Table,零拷贝流入 DuckDB/Polars
7.2 DataFusion:DuckDB 的 Rust 竞争者
Apache DataFusion 是 DuckDB 在 Rust 生态中的对应物,由 Arrow 社区维护:
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// 注册 Parquet 文件
ctx.register_parquet("sales", "s3://data/sales/*.parquet").await?;
// 执行 SQL 查询
let df = ctx.sql("
SELECT region, SUM(amount) as total
FROM sales
WHERE date >= '2025-01-01'
GROUP BY region
ORDER BY total DESC
").await?;
df.show().await?;
Ok(())
}
DataFusion 的优势在于可以嵌入到 Rust 应用中,而 DuckDB 是 C++ 编写的但提供了完善的 Rust 绑定。
7.3 从数据处理到数据分析的整合
2026 年的趋势是:Polars 的表达式系统 + DuckDB 的 SQL 引擎 + Arrow Flight 的传输协议三者的边界越来越模糊:
- Polars 已经开始内嵌 SQL 解析器(
polars.sql()) - DuckDB 开始支持外部表引用 Polars DataFrame
- Arrow Flight SQL 让 DuckDB 可以直接作为查询引擎服务其他应用
# 趋势:混搭模式将成为标配
# Polars 里写 SQL
df = pl.sql("""
SELECT department, AVG(salary) as avg_salary
FROM self
WHERE salary > 0
GROUP BY department
""", eager=True)
# DuckDB 里用 Python 表达式
duckdb.sql("""
SELECT *, salary / NULLIF(avg_dept_salary, 0) as ratio
FROM employees
NATURAL JOIN (
SELECT department, AVG(salary) as avg_dept_salary
FROM employees
GROUP BY department
)
""")
八、总结
如果你只记住三件事:
1. Apache Arrow 是新一代数据生态的底层标准。
它不是另一个 DataFrame 库,而是内存数据格式的 TCP/IP。不管上层用 Polars、DuckDB 还是 DataFusion,底层的列式数据格式都是 Arrow。这意味着不同工具之间可以零拷贝共享数据。
2. Polars 是 pandas 的现代替代,但不是简单替代。
Polars 的优势不在于"比 pandas 快 N 倍",而在于:
- 查询优化器(Lazy API)改变了你写数据处理代码的方式
- 表达式系统让代码更具表达力和可维护性
- 流式处理突破了内存限制
3. DuckDB 填补了分析领域最后的空白:单机版 ClickHouse。
当你不需要分布式、不需要实时写入、只需要对几 GB 到几 TB 的数据做快速分析时,DuckDB 的性价比无敌。配合 Parquet 文件 + S3/对象存储,你甚至不需要"数据库"这个概念——数据在文件里,查询即分析。
附录:快速选型指南
| 场景 | 推荐工具 | 理由 |
|---|---|---|
| 单次 CSV 分析(< 100MB) | pandas/Excel | 没必要引入新工具 |
| 每日 SQL 报表 | DuckDB | 零配置,SQL 棒 |
| 特征工程脚本 | Polars | 表达式系统强大 |
| 大型多表 JOIN | DuckDB | 优化器成熟 |
| 实时数据流处理 | Polars Streaming | 原生流支持 |
| 嵌入移动端/桌面端分析 | DuckDB/DataFusion | 嵌入式,体积小 |
| 机器学习前处理 | Polars + PyTorch | 直接输出 tensor |
| 小团队数据平台 | DuckDB + Parquet + S3 | 零运维的分析架构 |
如果你现在是 pandas 用户,推荐迁移路径:先换 Polars 的 Lazy API(上手成本极低),然后在需要 SQL 分析的时候引入 DuckDB。你会发现,处理数据这件事,从「怎么让数据不崩」变成了「数据怎么表达最优雅」。
这感觉,值得体验。