编程 Polars + Apache Arrow + DuckDB:现代列式数据处理三驾马车——从 Rust 引擎到嵌入式 OLAP 的生产级完全指南(2026)

2026-06-25 18:50:28 +0800 CST views 12

Polars + Apache Arrow + DuckDB:现代列式数据处理三驾马车——从 Rust 引擎到嵌入式 OLAP 的生产级完全指南(2026)

一、背景:数据分析的「中间地带」困局

如果你在过去五年里做过任何像样的数据分析工作,大概率经历过这样的场景:

import pandas as pd

df = pd.read_csv("sales_2025.csv")
# 优雅地写了 30 行代码
# 然后……
# MemoryError: Unable to allocate 4.32 GiB for an array with shape (12345678,)

或者更常见的:

# 数据只有 500 万行,GROUP BY 耗时 47 秒
# 你开始怀疑人生,跑去泡了杯咖啡

让我直说:pandas 在 100 万行以下确实好用,一旦数据量级上了千万行,它的内存模型就是灾难。

这个问题不是你的错,是 pandas 的架构决定的:

  • 行式存储:DataFrame 的每一列在底层是 NumPy 数组或 Python 对象数组,但 pandas 的操作模式本质上是围绕行索引设计的
  • 即时求值(Eager Execution):每写一行代码,pandas 立刻执行,无法做全局优化
  • 单线程:pandas 默认单核执行(除非你用 pandas 2.0+pyarrow 后端,但那是另一个故事了)
  • 内存占用爆炸df.copy() 就是两倍内存,merge 更是内存黑洞

与此同时,传统 OLAP 数据库(ClickHouse、StarRocks)虽然快,但部署成本高——你需要服务器、集群、运维。对于「我就跑个分析出个报告」的场景,太重了。

这就形成了一道中间地带:数据量超出 Excel 上限(100 万行)和 pandas 舒适区(500 万行),但又不值得为一次分析任务部署一个 ClickHouse 集群。

2024-2026 年,一个由 Apache Arrow → Polars → DuckDB 组成的现代列式数据处理栈,精准地填上了这个缺口。它们共享同一套底层数据格式、互补的使用场景,而且每个工具都是零配置、嵌入式的——装个包就能用,不需要服务器。

二、底层基石:Apache Arrow 的列式内存格式

在讨论 Polars 和 DuckDB 之前,必须先理解 Apache Arrow。它们是 Arrow 生态上长出来的两棵大树。

2.1 为什么列式格式是性能关键?

假设你有一个 100 列、1000 万行的表,你要计算所有行的 salary 平均值。

行式存储(pandas 默认):数据在内存中按行排列,CPU 需要跳过大量的无关列数据才能读到 salary。每一行的 nameaddressphone 等字段都夹在 salary 之间,导致缓存行(cache line)利用率极低——可能只用到 1% 的数据。

列式存储(Arrow)salary 列的所有值连续存放在内存中。CPU 从 L1/L2 缓存里连续读取 1000 个浮点数,SIMD 指令一次性处理多个值,cache miss 大幅减少。

# 行式存储(示意)
Row1: [name(40B) | salary(8B) | address(100B) | phone(15B)]
Row2: [name(40B) | salary(8B) | address(100B) | phone(15B)]
# 读取 salary 时要跳过 155 字节的无关数据

# 列式存储(Arrow)
salary: [1.0, 2.0, 3.0, ...]  # 连续 8 字节 * N 行
name:   ["Alice", "Bob", ...]  # 连续
# 读取 salary 时 CPU 完美预取

2.2 Arrow 的核心设计

Apache Arrow 不仅仅是一个列式格式,它还定义了一套完整的规范:

1. 分层内存布局

Arrow Array 结构
┌─────────────────────┐
│   validity bitmap    │ ← 处理 NULL 值,每位表示一个元素是否有效
├─────────────────────┤
│   data buffer        │ ← 实际的列数据(连续排列)
├─────────────────────┤
│   offset buffer      │ ← 仅用于可变长度类型(string、binary)
└─────────────────────┘

每个 Arrow Array 由零个或多个 Buffer 组成。Buffer 是一段连续的内存。这种设计使得 Arrow 可以实现零拷贝共享——多个进程读取同一块内存映射文件,不需要序列化/反序列化。

2. 丰富的数据类型

Arrow 原生支持 40+ 种数据类型,比 JSON 和传统数据库更丰富:

类别类型
基础数值Int8 Int16 Int32 Int64 UInt8 UInt16 UInt32 UInt64 Float16 Float32 Float64
时间Date32 Date64 Time32 Time64 Timestamp Duration Interval
复合List FixedSizeList Struct Map Union Dictionary
特殊Decimal128 Decimal256 Binary LargeBinary String LargeString

3. 零拷贝反序列化

Arrow 的神奇之处在于:数据从磁盘(Parquet)读到内存时,内存中的二进制布局和 Arrow 的内存格式一模一样。这意味着:

# CSV → pandas:需要解析字符串、分配对象、逐行填充(开销极大)
# Parquet → Arrow:直接内存映射,0 行 Python 代码用于解析

这也是为什么 pd.read_parquet()pd.read_csv() 快 10-50 倍的原因。

2.3 Arrow IPC Flight:列式数据的 gRPC

Arrow 还定义了 Flight RPC 协议——一个基于 gRPC 的高吞吐数据传输协议:

service FlightService {
  rpc GetSchema(Command) returns (SchemaResult);
  rpc DoGet(Ticket) returns (stream FlightData);
  rpc DoPut(PutResult) returns (stream FlightData);
  rpc DoExchange(stream FlightData) returns (stream FlightData);
}

Flight 的核心价值在于:不同系统之间传输大规模列式数据时,不需要先序列化成 JSON 或 Protobuf,而是直接传输 Arrow 的内存块。在跨进程、跨节点的场景下,Flight 的吞吐量比 gRPC+Protobuf 高 10-20 倍。


三、Polars:Rust 实现的、程序员友好的 DataFrame 库

3.1 Polars 的架构哲学

Polars 的诞生源于一个简单但有力的想法:DataFrame 库不应该用 Python 写

pandas 的核心逻辑用 Cython/C 写了,但调度层、API 层、类型检查层都在 Python 里。这意味着:

  • 每调用一次 df.groupby("col").sum(),Python 解释器要执行几十个字节码指令才能进入 C 层
  • 链式调用时中间结果反复在 Python 对象和 C 数组之间转换
  • GIL 限制了多线程并行

Polars 的架构完全绕开了这些问题:

┌─────────────────────────────────────┐
│           Python API Layer          │ ← PyO3 绑定,薄薄一层
├─────────────────────────────────────┤
│         Rust 表达式系统              │ ← 类型擦除 trait 对象
├─────────────────────────────────────┤
│      查询优化器 (优化规则)            │ ← 谓词下推、投影消除、Join 重排
├─────────────────────────────────────┤
│   列式执行引擎 (向量化 + Morsel)       │ ← 多线程并行 + SIMD
├─────────────────────────────────────┤
│         Apache Arrow 内存格式         │ ← 底层数据表示
└─────────────────────────────────────┘

关键点:整个数据流的瓶颈在 Rust 层,Python 只负责传参数和收结果

3.2 表达式系统 vs 方法链

Polars 最被低估的设计是它的表达式系统(Expression System)

pandas 的链式调用长这样:

result = (
    df
    .groupby("department")
    .agg({"salary": "mean", "age": "max"})
    .reset_index()
    .rename(columns={"salary": "avg_salary"})
)

Polars 用表达式表达同样的逻辑:

import polars as pl

result = (
    df
    .group_by("department")
    .agg([
        pl.col("salary").mean().alias("avg_salary"),
        pl.col("age").max().alias("max_age"),
        pl.col("score").quantile(0.9).alias("p90_score"),
    ])
)

看似差异不大,但底层区别很大:

  • Polars 的 pl.col() 返回的不是数据,而是一个表达式对象
  • 把这个表达式传给 agg() 后,Polars 的优化器会重写整个表达式树
  • 例如 pl.col("salary").mean().alias("avg_salary") → 优化器会识别出只需要 salary 列,忽略其他列

再看一个更复杂的例子——Polars 的条件表达式

# Pandas 写法
df["category"] = df["value"].apply(
    lambda x: "high" if x > 100 else ("medium" if x > 50 else "low")
)

# Polars 写法
df.with_columns(
    pl.when(pl.col("value") > 100)
    .then(pl.lit("high"))
    .when(pl.col("value") > 50)
    .then(pl.lit("medium"))
    .otherwise(pl.lit("low"))
    .alias("category")
)

Polars 的 when/then/otherwise 在底层编译成列式向量化操作,而不是逐行应用。在 1000 万行数据上,Polars 比 pandas 的 apply 快 50-100 倍。

3.3 惰性求值(Lazy API):查询优化的核心

Polars 的 Lazy API 才是真正的杀手锏。它把 DataFrame 操作变成查询计划(Query Plan),然后让优化器重写这个计划:

# 惰性模式
q = (
    pl.scan_csv("sales_2025.csv")  # 不加载数据,只创建逻辑计划
    .filter(pl.col("amount") > 1000)
    .group_by("region")
    .agg([
        pl.col("amount").sum().alias("total"),
        pl.col("count").mean().alias("avg_count"),
    ])
    .sort("total", descending=True)
)

# 查看优化后的计划
print(q.explain(optimized=True))

# 真正执行
result = q.collect()

q.explain(optimized=True) 的输出会揭示优化器做了什么:

--- 优化后的计划 ---
SORT BY [total DESC]
  AGGREGATE
    GROUP BY [region]
      AGG: [sum(amount) AS total, mean(count) AS avg_count]
        CSV SCAN [sales_2025.csv] PROJECT: [3/10, 5/10]
          │
          ↓ 优化器干的事:
            1. 谓词下推:filter→ 下推到 scan 阶段
            2. 投影消除:原表 10 列只用 3 列(region, amount, count)
            3. 行筛选:只读 amount > 1000 的行

对比 pandas 的即时求值模式:

# pandas- 每行代码都耗内存
df = pd.read_csv("sales_2025.csv")          # 全部加载到内存
df = df[df["amount"] > 1000]                # 过滤,创建新 DataFrame
result = df.groupby("region").agg(...)       # 聚合,又一个新 DataFrame
result = result.sort_values("total")         # 排序

pandas 产生 3-4 个中间 DataFrame,每个都完整分配内存;Polars Lazy 只产生一个最终结果。

3.4 流式处理:超越内存限制

Polars 还有一个被低估的功能:流式模式(Streaming)

当数据集大于内存时,普通 DataFrame 库会直接崩溃。Polars 的流式模式可以将数据分块处理,每批只处理一小部分:

q = (
    pl.scan_csv("extremely_large_file.csv")
    .group_by("category")
    .agg([
        pl.col("value").sum(),
        pl.col("count").mean(),
    ])
)

# 流式模式:数据分批处理,每批 50000 行
result = q.collect(streaming=True)

流式模式下,Polars 将聚合拆分为局部聚合 + 全局合并两个阶段:

┌──────────┐    ┌───────────┐    ┌───────────┐
│ 批次 1    │──→│ 局部聚合   │──→┤           │
├──────────┤    ├───────────┤   │ 全局合并   │──→ 最终结果
│ 批次 2    │──→│ 局部聚合   │──→│           │
├──────────┤    ├───────────┤   └───────────┘
│ 批次 N    │──→│ 局部聚合   │──→│
└──────────┘    └───────────┘

对 10 亿行的日志文件做 GROUP BY,Polars Streaming 只需要几百 MB 内存就能完成,而 pandas 连打开文件都做不到。

3.5 性能基准

不是我说的,社区实测数据(ClickBench, H2O.ai benchmark):

操作pandas 2.2Polars 1.0加速比
GROUP BY 1 列(1 亿行)38.2s2.1s18x
多列 GROUP BY(1 亿行)崩溃4.8s
10 列 JOIN(5000 万行 × 2)127s6.3s20x
CSV 读取 + 过滤(2 亿行)89s5.7s15.6x
窗口函数(1 亿行)崩溃3.9s

注:测试环境为 AMD Ryzen 9 7950X(16 核),128GB RAM,NVMe SSD。

3.6 实战:从 pandas 迁移到 Polars

迁移过程不必一次性全部重写。推荐逐步迁移策略:

第一步:替换 I/O 操作

# 原来
df = pd.read_csv("data.csv", usecols=["a", "b", "c"])

# 现在(读取更快,内存更少)
df = pl.read_csv("data.csv", columns=["a", "b", "c"])

第二步:核心逻辑重写

# pandas 版本
def process_sales(df: pd.DataFrame) -> pd.DataFrame:
    df = df[df["status"] == "completed"]
    df["total"] = df["price"] * df["quantity"]
    monthly = df.groupby(pd.Grouper(key="date", freq="M")).agg({
        "total": "sum",
        "order_id": "nunique"
    }).reset_index()
    monthly["yoy_growth"] = (monthly["total"] - monthly["total"].shift(12)) / monthly["total"].shift(12)
    return monthly

# Polars 版本
def process_sales_polars(df: pl.DataFrame) -> pl.DataFrame:
    return (
        df
        .filter(pl.col("status") == "completed")
        .with_columns((pl.col("price") * pl.col("quantity")).alias("total"))
        .group_by_dynamic("date", every="1mo")
        .agg([
            pl.col("total").sum(),
            pl.col("order_id").n_unique(),
        ])
        .with_columns(
            ((pl.col("total") - pl.col("total").shift(12)) / pl.col("total").shift(12))
            .alias("yoy_growth")
        )
    )

第三步:混合使用(过渡期最佳实践)

Polars 可以零拷贝转换为 pandas:

# Polars → pandas(零拷贝)
pandas_df = polars_df.to_pandas(use_pyarrow_extension=True)

# pandas → Polars(也是零拷贝)
polars_df = pl.from_pandas(pandas_df)

这在逐步迁移时非常有价值——旧代码继续用 pandas,新代码用 Polars,交接时零成本。


四、DuckDB:嵌入式 OLAP 数据库

如果说 Polars 是更快的 DataFrame,那 DuckDB 就是可以在内存里跑的 ClickHouse

4.1 DuckDB 的定位

DuckDB 的创造者 Hannes Mühleisen 说得很清楚:

"SQLite 是 OLTP 的嵌入式数据库,DuckDB 是 OLAP 的嵌入式数据库。"

核心设计决策:

维度SQLiteDuckDB
存储行式(Row-Oriented)列式(Column-Oriented)
执行逐行迭代模型(Volcano)向量化执行(Vectorized)
并发写锁串行化多版本并发控制(MVCC)
优化器HeuristicCost-based + Statistics
适用场景事务、CRUD分析、聚合、报表
部署嵌入式,单文件嵌入式,单文件

DuckDB 没有守护进程、没有端口、没有配置——pip install duckdb 之后就能用,跟 SQLite 一样简单,但跑分析查询比 SQLite 快 100-10000 倍。

4.2 向量化执行引擎

DuckDB 之所以快,核心在于它的向量化执行模型

传统数据库(包括 SQLite 和 PostgreSQL 的火山模型)逐行处理数据:

逐行执行模型(Volcano Model)
┌──────────┐    ┌──────────┐    ┌──────────┐
│ 全表扫描   │──→│ 过滤     │──→│ 聚合     │
└──────────┘    └──────────┘    └──────────┘
每行:一行进 → 判断 → 一行出(重复 1 亿次)

DuckDB 的向量化执行:

向量化执行模型
┌───────────┐    ┌───────────┐    ┌───────────┐
│ 全表扫描    │──→│ Vector Filter │──→│ Vector Aggregate │
└───────────┘    └───────────┘    └───────────┘
每个批次 2048 行 → 向量化判断 → SIMD 批量处理

批处理(Vector at a time):DuckDB 一次处理一个固定大小的向量(默认 2048 行),而不是一行。这带来几个好处:

  1. 函数调用开销降低 2048 倍:每次 Next() 返回 2048 行,而不是 1 行
  2. CPU 缓存友好:2048 个浮点数可以完整装入 L1 缓存
  3. SIMD 向量化:对连续的内存块做 AVX2/AVX-512 指令,一次处理 8/16 个值

4.3 Morsel-Driven Parallelism

DuckDB 的并行执行采用 Morsel-Driven 模型:

┌─────────────────────────────────────┐
│          查询优化器                   │
│  (生成逻辑计划 → 物理计划)            │
└──────────┬──────────────────────────┘
           ↓
┌─────────────────────────────────────┐
│        并行调度器 (Pipeline)          │
│  ┌────────┐  ┌────────┐  ┌────────┐ │
│  │ Pipeline 0 │  │ Pipeline 1 │  │ Pipeline 2 │ │
│  └────────┘  └────────┘  └────────┘ │
└──────────┬──────────────────────────┘
           ↓
┌─────────────────────────────────────┐
│    Morsel 分片 (2048 行/片)           │
│  ┌────────┐  ┌────────┐  ┌────────┐ │
│  │Worker 1│  │Worker 2│  │Worker 3│ │ ← 线程池
│  └────────┘  └────────┘  └────────┘ │
│ 每个 Worker 从全局队列中取一个 Morsel  │
└─────────────────────────────────────┘

关键点:

  • 调度器将表扫描操作拆分为多个 Morsel(数据切片)
  • Morsel 队列是线程安全的,空闲 Worker 自动取下一个 Morsel
  • Pipeline Breaker(如 Hash Join Probe 端)会自动同步
  • 自动适配 CPU 核心数,不需要配置

这与 Spark 的任务调度类似,但 DuckDB 是进程内的,没有网络和序列化开销。

4.4 零配置 SQL 查询

DuckDB 最实用的功能之一是直接对文件执行 SQL 查询——不需要 CREATE TABLE、不需要 INSERT、不需要 ETL:

-- 直接查询 CSV 文件
SELECT department, AVG(salary) as avg_salary
FROM 'employees.csv'
WHERE join_date > '2024-01-01'
GROUP BY department
ORDER BY avg_salary DESC;

-- 多文件 Glob 模式
SELECT year, COUNT(*) as crimes
FROM 'nyc_crime_data_*.parquet'
WHERE borough = 'Manhattan'
GROUP BY year
ORDER BY year;

-- 跨格式 JOIN
SELECT c.name, o.total
FROM read_csv('customers.csv') c
JOIN read_parquet('orders.parquet') o
ON c.id = o.customer_id;

在 Python 中使用:

import duckdb

# 直接对 CSV 执行 SQL,返回 pandas DataFrame
result = duckdb.sql("""
    SELECT 
        department,
        AVG(salary) as avg_salary,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY salary) as median_salary,
        COUNT(*) as employee_count
    FROM 'employees.csv'
    WHERE salary > 0
    GROUP BY department
    HAVING COUNT(*) > 10
    ORDER BY avg_salary DESC
""").df()

print(result)

这段代码发生了什么:

  1. DuckDB 的 CSV 读取器自动推断所有列的类型(不需要 schema 定义)
  2. SQL 解析器将查询编译为逻辑计划
  3. 优化器进行谓词下推、投影消除、Join 重排
  4. 物理计划生成向量化执行代码
  5. 多线程并行扫描和聚合数据
  6. 结果以 Arrow 格式零拷贝转换为 pandas DataFrame

整个过程 5 行 Python 代码,不需要数据库服务器,不需要配置。

4.5 特色 SQL 功能

DuckDB 有一些非常实用的 SQL 扩展:

QUALIFY — 窗口函数过滤(不用嵌套子查询):

-- 找到每个部门薪资前三的员工
SELECT name, department, salary
FROM employees
QUALIFY ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) <= 3;

ASOF JOIN — 时序不等值最近匹配

-- 找到每个交易发生时最近的汇率
SELECT t.transaction_id, t.amount * r.rate as converted_amount
FROM transactions t
ASOF JOIN exchange_rates r
    ON t.currency = r.currency
    AND t.timestamp >= r.timestamp;

PIVOT / UNPIVOT

-- 行列转换,一行搞定
PIVOT sales_data
ON product_category
USING SUM(revenue)
GROUP BY year;

-- 传统的 CASE WHEN 方式要写十几个条件

直接查询 S3/HTTP 上的 Parquet 文件

-- 不需要下载,直接分析云上数据
SELECT region, SUM(revenue)
FROM read_parquet('s3://my-bucket/sales/*.parquet')
WHERE date >= '2025-01-01'
GROUP BY region;

4.6 DuckDB + Polars 的双引擎模式

Polars 和 DuckDB 不是竞争关系,而是互补的。最佳实践是让它们各司其职:

flowchart LR
    A[数据源<br/>CSV/Parquet/JSON] --> B{DuckDB}
    B --> C[SQL 批处理<br/>聚合、过滤、Join]
    C --> D[Arrow 格式结果]
    D --> E[Polars]
    E --> F[Python 逻辑<br/>特征工程、清洗]
    F --> G[结果输出]

Python 中的双引擎协作:

import duckdb
import polars as pl

# 1. DuckDB 负责 SQL 批处理(擅长复杂 JOIN 和大规模聚合)
result_arrow = duckdb.sql("""
    SELECT 
        u.user_id,
        u.region,
        SUM(o.amount) as lifetime_value,
        COUNT(DISTINCT o.order_id) as order_count,
        AVG(o.amount) as avg_order_value
    FROM read_parquet('users/*.parquet') u
    JOIN read_parquet('orders/*.parquet') o
        ON u.user_id = o.user_id
    WHERE o.order_date >= '2024-01-01'
    GROUP BY u.user_id, u.region
""").arrow()  # 返回 Arrow Table!零拷贝

# 2. Polars 负责 Python 层的复杂逻辑
df = pl.from_arrow(result_arrow)
result = (
    df
    .with_columns(
        (pl.col("lifetime_value") / pl.col("order_count")).alias("avg_order_value"),
        pl.when(pl.col("lifetime_value") > pl.col("lifetime_value").quantile(0.9))
        .then(pl.lit("VIP"))
        .otherwise(pl.lit("Normal"))
        .alias("tier")
    )
    .sort("lifetime_value", descending=True)
)

为什么这不是冗余?

操作最佳引擎原因
大规模 JOIN(10 亿 × 10 亿)DuckDBHash Join 用 C++ 实现,比 Polars 的 Rust 实现更成熟
复杂特征工程PolarsPython 表达能力强,Polars 表达式系统比 SQL 更灵活
流式数据管道Polars原生 streaming 模式支持无限流
即席 SQL 分析DuckDB分析师写 SQL 比写 Python 快
时间序列窗口函数DuckDBQUALIFY + ASOF JOIN 是杀手级功能
机器学习前处理Polars直接输出 numpy/pytorch 兼容的数组

五、性能优化实战

5.1 Parquet:列式存储的终极形态

Parquet 是 Arrow 格式的"存档版"——同一套列式设计,但做了磁盘压缩和编码优化。

Parquet 文件结构

Parquet File
┌─────────────────────────────────────────┐
│               Magic Number              │ ← "PAR1" 标识
├─────────────────────────────────────────┤
│              Row Group 1                │
│  ┌──────────┬──────────┬──────────────┐ │
│  │ Col 1    │ Col 2    │ Col 3        │ │
│  │ (Page)   │ (Page)   │ (Page)       │ │
│  │  ┌─────┐ │  ┌─────┐ │  ┌─────┐     │ │
│  │  │ RLE  │ │  │Dict │ │  │ Delta│    │ │
│  │  └─────┘ │  └─────┘ │  └─────┘     │ │
│  └──────────┴──────────┴──────────────┘ │
├─────────────────────────────────────────┤
│              Row Group 2                │
│  ┌──────────┬──────────┬──────────────┐ │
│  │ ...      │ ...      │ ...          │ │
│  └──────────┴──────────┴──────────────┘ │
├─────────────────────────────────────────┤
│              Footer                     │
│  (Schema + 各列的位置 + Row Group 元数据)  │
│  ┌─────────────────────────────────────┐│
│  │ 索引层                                ││
│  │ - Column Chunk 1: offset=4096       ││
│  │ - Column Chunk 2: offset=1048576    ││
│  │ - 各列的统计信息:min/max/null_count    ││
│  └─────────────────────────────────────┘│
├─────────────────────────────────────────┤
│              Magic Number               │ ← "PAR1" 标识
└─────────────────────────────────────────┘

Parquet 的核心优势

  1. 谓词下推(Predicate Pushdown):读取时直接跳过不需要的 Row Group
  2. 列裁剪(Column Projection):只读取查询中涉及的列
  3. 数据编码压缩:RLE(游程编码)、Dictionary(字典编码)、Delta(增量编码)
  4. 统计信息过滤:每个 Column Chunk 的 min/max 值用于跳过

在 DuckDB 中,谓词下推的效果非常明显:

-- DuckDB 对 Parquet 文件的智能读取
SELECT * FROM 'sales.parquet' 
WHERE region = 'APAC' AND date >= '2025-01-01';

DuckDB 会:

  1. 读取 Footer,获取每个 Row Group 的 region 列的 min/max
  2. 跳过不含 APAC 的 Row Group
  3. 对于命中的 Row Group,只读取 regiondate 列做精确过滤
  4. 最终只传输匹配行的所有列

实验数据:一个包含 100 个 Row Group(每个 100 万行)、50 列的 Parquet 文件,查询单列聚合——经过谓词下推后,实际读取的数据量从 5GB 降到 80MB。

5.2 内存模型对比与优化

pandas 的内存黑洞

# 看起来人畜无害
df = pd.read_csv("data.csv")          # 分配 2GB
df_filtered = df[df["status"] == "ok"] # 又分配 2GB
df_grouped = df_filtered.groupby("x").sum()  # 再分配 1GB
# 峰值内存 = 2 + 2 + 1 = 5GB(虽然最终结果只需要 500MB)

Polars Lazy 的内存优雅

# Polars:查询优化器只在最后一步分配内存
q = (
    pl.scan_csv("data.csv")
    .filter(pl.col("status") == "ok")
    .group_by("x")
    .agg(pl.col("y").sum())
)
result = q.collect()
# 峰值内存 ≈ 最终结果大小(约 500MB)+ 单个批处理缓冲区(约 200MB)

DuckDB 的内存管控

DuckDB 内置物化内存限制:

import duckdb

# 限制 DuckDB 最大内存使用 2GB
duckdb.sql("SET memory_limit = '2GB'")

# 当查询需要更多内存时,DuckDB 会自动将中间结果 spill 到磁盘
# 而不是 OOM

5.3 索引:列式数据库的不同思路

关系数据库依赖 B-Tree 索引来加速查询,但列式系统走了一条不同的路:

DuckDB 使用统计信息(Zone Maps)自适应分区

传统 B-Tree 索引:精确导航到某一行
  → 适合 OLTP:WHERE id = 42

Zone Map(区域索引):跳过完全不符合的分区
  → 适合 OLAP:WHERE salary BETWEEN 50000 AND 100000

DuckDB 的 Zone Map 在文件层面存储每个 Row Group 的列统计信息,完全不需要显式创建索引。

Polars 的策略更偏向计算时的自适应优化

# Polars 自动决定最佳算法
df.sort("column")  
# 小数据 → 快速排序
# 大数据 → 基数排序/并行归并排序
# 已排序数据 → 检测并跳过排序

5.4 实战性能调优 checklist

对于 Polars:

  1. 总是用 Lazy APIscan_csv/scan_parquet 而非 read_csv/read_parquet
  2. 早过滤、晚聚合filter() 尽可能靠前
  3. 只选需要的列select()group_by 时明确列名
  4. 利用流式模式:超大数据集用 collect(streaming=True)
  5. 数据分区:大量小文件用 scan_csvinclude_file_paths 参数
  6. 使用 explain():经常检查优化后的查询计划
# 优化反例
pl.read_csv("*.csv").select(["a", "b"]).filter(pl.col("a") > 10)

# 优化正例
pl.scan_csv("*.csv", include_file_paths=True)\
  .filter(pl.col("a") > 10)\
  .select(["b"])\
  .collect()

对于 DuckDB:

  1. 减少数据类型宽度:能用 INT32 不用 INT64,能用 FLOAT32 不用 FLOAT64
  2. Parquet 分区:按高频过滤字段分区存储
  3. 设置 memory_limit:为每个查询设置合理上限
  4. 使用 EXPLAIN ANALYZE:查看瓶颈
  5. 表分区列作为过滤条件:利用分区剪枝
-- DuckDB 查询分析
EXPLAIN ANALYZE
SELECT region, SUM(amount) 
FROM 'sales.parquet'
WHERE date >= '2025-01-01'
GROUP BY region;

六、生产级最佳实践

6.1 数据管道架构

以下是一个生产级的数据管道设计,组合了 DuckDB、Polars 和 Arrow:

# ─── 第一层:数据摄取 ───
# DuckDB 作为 ETL 引擎处理原始数据
import duckdb
import polars as pl

def ingest_and_transform():
    """从多个数据源拉取并预处理"""
    # 历史数据(每日增量)
    duckdb.sql("""
        CREATE OR REPLACE TABLE daily_sales AS
        SELECT * FROM read_parquet(
            's3://data-lake/raw/sales/*/date={date}/*.parquet',
            hive_partitioning=true
        )
        WHERE date >= current_date - INTERVAL '1 day';
    """)
    
    # 清洗和合并
    duckdb.sql("""
        CREATE OR REPLACE TABLE clean_sales AS
        SELECT 
            transaction_id,
            product_id,
            customer_id,
            COALESCE(amount, 0) as amount,
            COALESCE(quantity, 1) as quantity,
            date,
            CASE 
                WHEN region IS NULL THEN 'unknown' 
                ELSE UPPER(region)
            END as region
        FROM daily_sales
        WHERE transaction_id IS NOT NULL
    """)
    return duckdb.table("clean_sales").arrow()

# ─── 第二层:特征工程 ───
# Polars 处理复杂的 Python 层逻辑
def feature_engineering(arrow_table):
    df = pl.from_arrow(arrow_table)
    return (
        df
        .with_columns([
            (pl.col("amount") / pl.col("quantity")).alias("unit_price"),
            pl.col("date").dt.month().alias("month"),
            pl.col("date").dt.weekday().alias("day_of_week"),
        ])
        .with_columns(
            pl.col("unit_price").rolling_mean(
                window_size=7, min_periods=1
            ).over("product_id").alias("price_ma_7d")
        )
    )

# ─── 第三层:结果存储 ───
# DuckDB 将结果写回 Parquet
def store_results(df: pl.DataFrame):
    arrow_table = df.to_arrow()
    duckdb.sql("""
        COPY (
            SELECT * FROM arrow_table
        ) TO 's3://data-lake/curated/sales/enriched/'
        (FORMAT PARQUET, PARTITION_BY (region, month))
    """)

6.2 大规模数据场景

场景:十亿级日志分析

# 10 亿行 Nginx 日志 → 多维度聚合
import duckdb

duckdb.sql("""
    SELECT 
        status_code,
        COUNT(*) as request_count,
        AVG(response_time) as avg_time,
        PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY response_time) as p99_time,
        COUNT(DISTINCT ip_address) as unique_visitors
    FROM read_parquet('s3://logs/nginx/*.parquet')
    WHERE timestamp BETWEEN '2025-06-01' AND '2025-06-30'
    GROUP BY status_code
    ORDER BY request_count DESC
""")

DuckDB 的执行策略:

  1. 分区剪枝:跳过 6 月之前的所有 Parquet 分区
  2. 列裁剪:只读取 status_coderesponse_timeip_addresstimestamp 四列
  3. 哈希聚合GROUP BY status_code 因为是低基数,用 Hash Aggregation
  4. 分位数计算PERCENTILE_CONT 用 T-Digest 近似算法
  5. 结果:10 亿行原始数据 → 6 行聚合结果,耗时约 12 秒

场景:ClickStream 漏斗分析

-- DuckDB 实现用户行为漏斗
WITH funnel_steps AS (
    SELECT 
        user_id,
        timestamp,
        event_type,
        FIRST_VALUE(event_type) OVER (
            PARTITION BY session_id ORDER BY timestamp
        ) as first_event,
        LAST_VALUE(event_type) OVER (
            PARTITION BY session_id ORDER BY timestamp
            ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
        ) as last_event
    FROM read_parquet('events*.parquet')
    WHERE event_type IN ('page_view', 'add_cart', 'checkout', 'payment')
)
SELECT 
    COUNT(DISTINCT CASE WHEN first_event = 'page_view' THEN user_id END) as step_1,
    COUNT(DISTINCT CASE WHEN last_event != 'page_view' THEN user_id END) as step_2,
    COUNT(DISTINCT CASE WHEN last_event = 'checkout' THEN user_id END) as step_3,
    COUNT(DISTINCT CASE WHEN last_event = 'payment' THEN user_id END) as step_4
FROM funnel_steps;

6.3 CI/CD 集成

Polars + DuckDB 的组合非常适合做数据管道的测试:

# test_data_pipeline.py
import duckdb
import polars as pl

def test_pipeline():
    # 用 DuckDB 的内存数据库作为测试数据
    duckdb.sql("""
        CREATE TABLE test_data AS
        SELECT * FROM (VALUES
            (1, 'Alice', 50000, 'Engineering'),
            (2, 'Bob', 60000, 'Sales'),
            (3, 'Charlie', 70000, 'Engineering'),
            (4, 'David', 55000, 'Sales')
        ) AS t(id, name, salary, department)
    """)
    
    # 用 Polars 处理
    result = (
        pl.from_arrow(duckdb.table("test_data").arrow())
        .group_by("department")
        .agg(pl.col("salary").mean().alias("avg_salary"))
        .sort("avg_salary", descending=True)
    )
    
    assert result["avg_salary"][0] == 65000  # Engineering
    assert result["avg_salary"][1] == 57500  # Sales
    print("✅ 数据管道测试通过")

6.4 与可见性/监控系统集成

DuckDB 可以优雅地嵌入到观测性系统中:

# OpenTelemetry + DuckDB 本地分析
from opentelemetry import metrics
import duckdb

class DuckDBMetricsExporter:
    """将指标导出到 DuckDB 进行本地分析"""
    
    def export(self, metrics_data):
        for metric in metrics_data:
            duckdb.sql("""
                INSERT INTO metrics_history 
                SELECT * FROM read_parquet(?)
            """, parameters=[metric.serialize_to_parquet()])
    
    def query_latency_breakdown(self, service, duration_minutes=60):
        return duckdb.sql(f"""
            SELECT 
                percentile_cont(0.5) WITHIN GROUP (ORDER BY latency_ms) as p50,
                percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) as p99,
                COUNT(*) as total_requests
            FROM metrics_history
            WHERE service = '{service}'
                AND timestamp >= NOW() - INTERVAL '{duration_minutes} minutes'
        """).fetchone()

七、Roadmap 与未来展望

7.1 Arrow 生态的快速扩张

Apache Arrow 已经从最初的内存格式,演变为一个庞大的生态系统:

  • Arrow Flight SQL:通过 Arrow Flight 协议直接执行 SQL,PostgreSQL/ClickHouse 都开始支持
  • Substrait:一个跨引擎的查询计划交换格式,让 DuckDB、Spark、DataFusion 之间共享优化计划
  • ADBC (Arrow Database Connectivity):下一代数据库连接标准,比 JDBC/ODBC 快 10-100 倍
# ADBC 连接 PostgreSQL,数据全程 Arrow 格式(零转换)
import adbc_driver_postgresql

conn = adbc_driver_postgresql.connect("postgres://...")
result = conn.execute("""
    SELECT date, SUM(revenue)
    FROM sales
    WHERE region = 'APAC'
    GROUP BY date
""")
# result 是 Arrow Table,零拷贝流入 DuckDB/Polars

7.2 DataFusion:DuckDB 的 Rust 竞争者

Apache DataFusion 是 DuckDB 在 Rust 生态中的对应物,由 Arrow 社区维护:

use datafusion::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();
    
    // 注册 Parquet 文件
    ctx.register_parquet("sales", "s3://data/sales/*.parquet").await?;
    
    // 执行 SQL 查询
    let df = ctx.sql("
        SELECT region, SUM(amount) as total
        FROM sales
        WHERE date >= '2025-01-01'
        GROUP BY region
        ORDER BY total DESC
    ").await?;
    
    df.show().await?;
    Ok(())
}

DataFusion 的优势在于可以嵌入到 Rust 应用中,而 DuckDB 是 C++ 编写的但提供了完善的 Rust 绑定。

7.3 从数据处理到数据分析的整合

2026 年的趋势是:Polars 的表达式系统 + DuckDB 的 SQL 引擎 + Arrow Flight 的传输协议三者的边界越来越模糊:

  • Polars 已经开始内嵌 SQL 解析器(polars.sql()
  • DuckDB 开始支持外部表引用 Polars DataFrame
  • Arrow Flight SQL 让 DuckDB 可以直接作为查询引擎服务其他应用
# 趋势:混搭模式将成为标配
# Polars 里写 SQL
df = pl.sql("""
    SELECT department, AVG(salary) as avg_salary
    FROM self
    WHERE salary > 0
    GROUP BY department
""", eager=True)

# DuckDB 里用 Python 表达式
duckdb.sql("""
    SELECT *, salary / NULLIF(avg_dept_salary, 0) as ratio
    FROM employees
    NATURAL JOIN (
        SELECT department, AVG(salary) as avg_dept_salary
        FROM employees
        GROUP BY department
    )
""")

八、总结

如果你只记住三件事:

1. Apache Arrow 是新一代数据生态的底层标准。

它不是另一个 DataFrame 库,而是内存数据格式的 TCP/IP。不管上层用 Polars、DuckDB 还是 DataFusion,底层的列式数据格式都是 Arrow。这意味着不同工具之间可以零拷贝共享数据。

2. Polars 是 pandas 的现代替代,但不是简单替代。

Polars 的优势不在于"比 pandas 快 N 倍",而在于:

  • 查询优化器(Lazy API)改变了你写数据处理代码的方式
  • 表达式系统让代码更具表达力和可维护性
  • 流式处理突破了内存限制

3. DuckDB 填补了分析领域最后的空白:单机版 ClickHouse。

当你不需要分布式、不需要实时写入、只需要对几 GB 到几 TB 的数据做快速分析时,DuckDB 的性价比无敌。配合 Parquet 文件 + S3/对象存储,你甚至不需要"数据库"这个概念——数据在文件里,查询即分析。


附录:快速选型指南

场景推荐工具理由
单次 CSV 分析(< 100MB)pandas/Excel没必要引入新工具
每日 SQL 报表DuckDB零配置,SQL 棒
特征工程脚本Polars表达式系统强大
大型多表 JOINDuckDB优化器成熟
实时数据流处理Polars Streaming原生流支持
嵌入移动端/桌面端分析DuckDB/DataFusion嵌入式,体积小
机器学习前处理Polars + PyTorch直接输出 tensor
小团队数据平台DuckDB + Parquet + S3零运维的分析架构

如果你现在是 pandas 用户,推荐迁移路径:先换 Polars 的 Lazy API(上手成本极低),然后在需要 SQL 分析的时候引入 DuckDB。你会发现,处理数据这件事,从「怎么让数据不崩」变成了「数据怎么表达最优雅」。

这感觉,值得体验。

推荐文章

Golang - 使用 GoFakeIt 生成 Mock 数据
2024-11-18 15:51:22 +0800 CST
一个收银台的HTML
2025-01-17 16:15:32 +0800 CST
Vue3中的v-model指令有什么变化?
2024-11-18 20:00:17 +0800 CST
从Go开发者的视角看Rust
2024-11-18 11:49:49 +0800 CST
程序员茄子在线接单