Pathway 深度实战:55K Star 的 Python 实时 ETL 框架——从差分数据流原理到 RAG 管道生产部署的全链路架构解析
当 Flink 还在为 Python API 的性能损耗头疼时,Pathway 已经用 Rust 引擎把差分数据流玩出了花。一个 Python 写代码、Rust 跑引擎的实时 ETL 框架,凭什么能在 GitHub 上拿下 55K+ Star?本文从差分数据流的数学基础开始,逐层拆解 Pathway 的架构设计、增量计算引擎、状态管理与一致性保证,最后带你手撸一个生产级 RAG 管道。
一、为什么需要又一个流处理框架?
如果你做过实时数据处理,大概率踩过这些坑:
- Flink:Java/Scala 生态,Python API(PyFlink)性能拉胯,序列化开销大到让人怀疑人生
- Spark Streaming:微批处理本质,延迟下限被批次间隔锁死
- Kafka Streams:JVM only,Python 开发者只能望洋兴叹
- 自研方案:Kafka + Python consumer + Redis + 定时任务……维护噩梦
核心矛盾就一个:Python 开发者想要 Flink 级别的流处理能力,但不想写 Java。
Pathway 的答案很直接:Python API 写业务逻辑,Rust 引擎跑计算。你不是要 Python 的开发效率吗?给你。你不是要低延迟的流处理吗?Rust 引擎给你兜底。
这不是简单的语言绑定,而是一个从头设计的差分数据流引擎。Python 只是用户界面,Rust 才是灵魂。
1.1 Pathway 的核心定位
Pathway 把自己定义为 "Python ETL framework for stream processing, real-time analytics, LLM pipelines, and RAG"。拆开来看:
| 维度 | Pathway 的做法 |
|---|---|
| 编程模型 | 声明式 Python API,像写 Pandas 一样写流处理 |
| 执行引擎 | Rust 实现的 Differential Dataflow 引擎 |
| 计算模式 | 增量计算(Incremental Computation),只处理变化的数据 |
| 流批一体 | 同一份代码,批处理和流处理零修改切换 |
| AI 集成 | 内置 LLM 管道和 RAG 支持,向量索引开箱即用 |
| 部署模式 | 单机 Docker → Kubernetes 分布式,线性扩展 |
一句话总结:用写批处理的方式写流处理,用 Python 的语法享受 Rust 的性能。
二、差分数据流:Pathway 的数学灵魂
要理解 Pathway 为什么快,必须先理解 Differential Dataflow(差分数据流)。这是 MIT Frank McSherry 教授在 2013 年提出的计算模型,也是 Pathway 引擎的数学基础。
2.1 从全量计算到增量计算
传统流处理的思路是:数据来了就重新算一遍。比如你有一个窗口求和,每来一条新数据,就遍历窗口内所有数据重新求和。
差分数据流的思路完全不同:不重新算,只算变化量。
数学表达:
全量计算: Output = f(Input)
增量计算: ΔOutput = f(Input + ΔInput) - f(Input)
最终结果: NewOutput = Output + ΔOutput
这意味着:
- 窗口求和:新结果 = 旧结果 + 新数据 - 过期数据,O(1) 复杂度
- 窗口计数:新结果 = 旧结果 + 1(新来)- 1(过期),O(1) 复杂度
- Group By 聚合:只重算受影响的分组,O(受影响的分组) 复杂度
2.2 差分数据流的三要素
差分数据流中的每条数据由三元组 (data, time, diff) 表示:
- data:数据本身(比如一条用户记录)
- time:逻辑时间戳(事件时间或处理时间)
- diff:差分值(+1 表示插入,-1 表示撤回)
这个设计精妙在哪儿?撤回机制。在流处理中,晚到的数据可能导致之前的结果不正确。差分数据流不是"覆盖旧结果",而是"发出一条 diff=-1 的撤回记录,再发出一条 diff=+1 的新记录"。
# 传统方式:覆盖
result = 100 # 旧结果
result = 105 # 新结果直接覆盖
# 差分数据流:撤回 + 重发
# 旧结果: (user_123, sum=100, time=t1, diff=+1)
# 撤回: (user_123, sum=100, time=t2, diff=-1) # 撤回旧值
# 新结果: (user_123, sum=105, time=t2, diff=+1) # 发出新值
# 最终: 100 * (-1) + 105 * (+1) = 105 ✓
2.3 为什么这比 Flink 快?
Flink 的增量计算主要靠 Keyed State + Window Trigger,本质上还是"攒一批再算"。而 Pathway 的 Differential Dataflow 是在算子图层面做增量——每个算子都理解差分语义,能精确知道哪些中间结果需要更新。
| 特性 | Flink | Pathway |
|---|---|---|
| 增量粒度 | 算子级(需要手动设计) | 数据级(自动差分) |
| 撤回机制 | 需要 retract 机制(SQL 模式) | 内建 diff 三元组 |
| 多表 Join 增量 | 需要维护双端状态 | 差分传播自动处理 |
| 乱序数据处理 | Watermark + AllowedLateness | 差分时间域自动合并 |
这不是"Pathway 比 Flink 好"的意思——Flink 在超大规模集群调度、Exactly-Once 语义上更成熟。但在 单机到中小规模集群的实时 ETL 场景,Pathway 的差分数据流模型确实更优雅、更高效。
三、架构全貌:Python 是皮,Rust 是骨
3.1 分层架构
Pathway 的架构可以清晰地分为四层:
┌─────────────────────────────────────────┐
│ Python API Layer │ ← 用户写代码的地方
│ pw.Table / pw.Schema / pw.io / pw.ml │
├─────────────────────────────────────────┤
│ Computation Graph │ ← 声明式的 DAG
│ 节点 = Transformation / 边 = Channel │
├─────────────────────────────────────────┤
│ Rust Execution Engine │ ← 真正干活的引擎
│ Differential Dataflow / State Mgmt │
├─────────────────────────────────────────┤
│ Connector Layer │ ← 数据进出
│ Kafka / PostgreSQL / GDrive / REST │
└─────────────────────────────────────────┘
关键设计决策:Python 和 Rust 之间的通信不是 RPC,而是共享内存 + FFI。
Python 层只做两件事:
- 构建计算图(DAG)
- 发送初始快照给 Rust 引擎
Rust 引擎接管后,所有数据流转、状态管理、增量计算都在 Rust 侧完成,不需要跨语言调用。这就是为什么 Pathway 的性能可以接近纯 Rust 实现。
3.2 Python API 层:像 Pandas 一样写流处理
Pathway 的 Python API 设计哲学是:让你觉得在写批处理,实际跑的是流处理。
import pathway as pw
# 定义数据模式 —— 和 Pydantic 一样直觉
class TransactionSchema(pw.Schema):
tx_id: str = pw.column_definition(primary_key=True)
user_id: int
amount: float
currency: str
timestamp: pw.DateTimeNaive
# 读取 Kafka 数据 —— 声明式,不需要管 offset
transactions = pw.io.kafka.read(
bootstrap_servers="localhost:9092",
topic="transactions",
schema=TransactionSchema,
autocommit_duration_ms=1000,
)
# 过滤 —— 和 Pandas 一模一样的语法
usd_transactions = transactions.filter(
transactions.currency == "USD"
)
# 窗口聚合 —— 声明式窗口定义
daily_stats = usd_transactions.groupby(
usd_transactions.user_id
).reduce(
user_id=pw.this.user_id,
total_amount=pw.reducers.sum(usd_transactions.amount),
tx_count=pw.reducers.count(),
avg_amount=pw.reducers.avg(usd_transactions.amount),
)
# 输出到 PostgreSQL —— 自动创建表、自动 schema 推断
pw.io.postgres.write(
daily_stats,
connection_string="postgresql://user:pass@localhost/db",
table_name="daily_transaction_stats",
)
# 启动引擎 —— 这一行把 DAG 交给 Rust 引擎执行
pw.run()
注意 pw.run() 这一行。在它之前,所有操作都只是在构建一个 声明式的计算图,没有数据流动。pw.run() 之后,Rust 引擎接管,开始消费 Kafka 数据、执行增量计算、写入 PostgreSQL。
同一个脚本,你改一下输入源,就能从流处理切换到批处理:
# 流处理:从 Kafka 实时读
transactions = pw.io.kafka.read(...)
# 批处理:从 CSV 文件读(只需改这一行)
transactions = pw.io.csv.read("./data/transactions.csv")
# 后续的过滤、聚合、输出代码完全不变
这就是 流批一体 的威力:开发用批数据调试,生产用流数据运行,零代码修改。
3.3 Rust 引擎:差分数据流的工程实现
Pathway 的 Rust 引擎在 src/ 目录下,核心模块包括:
| 模块 | 路径 | 功能 |
|---|---|---|
| 差分数据流核心 | src/engine/ | Differential Dataflow 的 Rust 实现 |
| 状态管理 | src/engine/state/ | 持久化状态、快照、恢复 |
| 调度器 | src/engine/scheduler/ | 算子调度、优先级管理 |
| 通道 | src/engine/channel/ | 算子间数据传输 |
| 连接器 | src/connector/ | Kafka、PostgreSQL 等连接器实现 |
| Python 绑定 | src/api/ | FFI 接口,Python↔Rust 桥接 |
引擎的核心数据结构是 安排追踪排列(Arrangement)——一种支持高效差分查找的索引结构。你可以把它理解为一个多级 HashMap,其中键是 (key, time),值是 (value, diff) 的集合。
// 简化的 Arrangement 结构
pub struct Arrangement<K, V> {
// 外层: key -> 内层排列
data: HashMap<K, OrdOffset<V>>,
}
pub struct OrdOffset<V> {
// 按时间排序的 (value, diff) 列表
entries: Vec<(V, i64, Timestamp)>, // (value, diff, time)
}
当增量数据到达时,引擎不需要扫描整个 Arrangement,而是:
- 通过 key 定位到对应的 OrdOffset
- 通过 time 找到受影响的条目
- 应用 diff 更新,生成新的差分输出
这种 按键分区 + 按时间排序 的双重索引,让增量查找的复杂度从 O(n) 降到 O(log n)。
3.4 连接器生态:300+ 数据源的接入能力
Pathway 的连接器分两大类:
原生连接器(Rust 实现,零拷贝高性能):
| 连接器 | 输入/输出 | 特点 |
|---|---|---|
| Kafka | 输入+输出 | 最常用,支持 offset 管理、Consumer Group |
| PostgreSQL | 输入+输出 | CDC 支持,逻辑解码 |
| MongoDB | 输入+输出 | Change Stream 实时监听 |
| CSV/JSON | 输入 | 支持目录监听,自动发现新文件 |
| REST API | 输入+输出 | 轮询或 Webhook 模式 |
| WebSocket | 输入+输出 | 实时双向通信 |
Airbyte 连接器(通过 Airbyte 协议,300+ 数据源):
# 通过 Airbyte 连接 Salesforce
salesforce_data = pw.io.airbyte.read(
source="salesforce",
config={
"client_id": "...",
"client_secret": "...",
"refresh_token": "...",
},
streams=["Contact", "Opportunity"],
)
连接器的设计统一遵循 "变更流"(Change Stream) 语义——每个连接器都输出 (insert/update/delete, data, timestamp) 三元组,天然适配差分数据流的 diff 语义。
四、核心概念深度解析
4.1 Table:流批统一的数据结构
Pathway 的 pw.Table 是最核心的数据结构。它同时表示:
- 批处理中的一张静态表
- 流处理中的一个持续变化的动态表
这个统一是怎么实现的?Table 内部存储的不是数据快照,而是差分日志。
import pathway as pw
class EventSchema(pw.Schema):
event_id: str = pw.column_definition(primary_key=True)
user_id: int
event_type: str
value: float
# 创建 Table —— 这不是一个静态快照,而是一个持续变化的流
events = pw.io.kafka.read(
bootstrap_servers="localhost:9092",
topic="events",
schema=EventSchema,
)
# 此时 events 是一个 Table,它的内容会随着 Kafka 消息持续变化
# 但你操作它的方式和操作静态表一模一样
Table 的关键属性:
| 属性 | 说明 |
|---|---|
_id | 每行的唯一标识(自动生成或由 primary_key 指定) |
| 列 | 类型安全的列,支持 int、str、float、datetime、Optional 等 |
| 差分 | 内部维护 (data, time, diff) 三元组 |
| 更新模式 | pw.Mode.BATCHING(批处理)或 pw.Mode.STREAMING(流处理) |
4.2 Transformation:声明式的数据变换
Pathway 提供了丰富的 Transformation 操作,覆盖 ETL 的所有场景:
基础变换:
# 选择列
selected = events.select(
user_id=events.user_id,
event_type=events.event_type,
value=events.value,
)
# 过滤
high_value = events.filter(events.value > 100)
# 映射 —— 支持 Python 表达式和 UDF
enriched = events.select(
**events, # 展开所有列
value_usd=pw.if_else(
events.currency == "EUR",
events.value * 1.08,
events.value,
),
is_premium=pw.if_else(events.value > 1000, True, False),
)
# 排序(注意:流处理中排序是全局操作,慎用)
sorted_events = events.sort(pw.this.timestamp)
聚合变换:
# Group By + Reduce
user_stats = events.groupby(events.user_id).reduce(
user_id=pw.this.user_id,
total_value=pw.reducers.sum(events.value),
event_count=pw.reducers.count(),
max_value=pw.reducers.max(events.value),
min_value=pw.reducers.min(events.value),
avg_value=pw.reducers.avg(events.value),
unique_types=pw.reducers.any(events.event_type), # 任意值
)
# 内置聚合器
import pathway as pw
print(dir(pw.reducers))
# ['any', 'argmax', 'argmin', 'avg', 'count', 'max', 'min', 'npsum',
# 'sorted_tuples', 'sum', 'unique', ...]
Join 变换:
class UserSchema(pw.Schema):
user_id: int = pw.column_definition(primary_key=True)
name: str
country: str
users = pw.io.csv.read("./users.csv", schema=UserSchema)
# Inner Join
enriched_events = events.join(
users,
events.user_id == users.user_id,
).select(
event_id=events.event_id,
user_name=users.name,
country=users.country,
event_type=events.event_type,
value=events.value,
)
# Left Join
left_joined = events.join_left(
users,
events.user_id == users.user_id,
).select(
**events,
user_name=users.name,
country=users.country,
)
# 增量 Join 的威力:当 users 表更新时,只重算受影响的 join 结果
# 这是差分数据流的天然优势
4.3 窗口操作:时间语义的精确控制
流处理的核心难题之一是 时间。Pathway 提供了三种窗口类型:
import pathway as pw
# 1. 滑动窗口(Sliding Window)
# 每 5 分钟统计过去 1 小时的数据
sliding_stats = events.windowby(
events.timestamp,
window=pw.temporal.sliding(
duration=pw.Duration.minutes(60), # 窗口大小
hop=pw.Duration.minutes(5), # 滑动步长
),
).reduce(
window_start=pw.this._pw_window_start,
window_end=pw.this._pw_window_end,
total_value=pw.reducers.sum(events.value),
count=pw.reducers.count(),
)
# 2. 滚动窗口(Tumbling Window)
# 每 1 小时统计一次
tumbling_stats = events.windowby(
events.timestamp,
window=pw.temporal.tumbling(
duration=pw.Duration.hours(1),
),
).reduce(
window_start=pw.this._pw_window_start,
total_value=pw.reducers.sum(events.value),
)
# 3. 会话窗口(Session Window)
# 用户活跃期间的数据,30 分钟不活跃则断开
session_stats = events.windowby(
events.timestamp,
window=pw.temporal.session(
gap=pw.Duration.minutes(30),
),
instance=events.user_id, # 按用户分窗口
).reduce(
user_id=events.user_id,
session_start=pw.this._pw_window_start,
session_end=pw.this._pw_window_end,
total_value=pw.reducers.sum(events.value),
event_count=pw.reducers.count(),
)
乱序数据处理:
# Pathway 通过 late 延迟参数处理乱序数据
windowed = events.windowby(
events.timestamp,
window=pw.temporal.tumbling(duration=pw.Duration.minutes(5)),
behavior=pw.temporal.common_behavior(
cutoff=pw.Duration.minutes(2), # 允许 2 分钟的迟到数据
),
).reduce(
total_value=pw.reducers.sum(events.value),
)
cutoff 的含义是:窗口关闭后,仍然保留 2 分钟的时间窗口来接收迟到数据。这比 Flink 的 Watermark + AllowedLateness 更直觉——你不需要理解 Watermark 机制,只需要告诉 Pathway "数据最多迟到多久"。
4.4 状态管理与持久化
流处理最怕什么?宕机重算。
Pathway 的状态管理基于 Rust 引擎的内存状态 + 持久化后端:
import pathway as pw
# 启用持久化
pw.run(
persistent_storage="./pathway_state", # 本地持久化目录
# 或者使用 S3
# persistent_storage="s3://my-bucket/pathway-state",
)
持久化的实现原理:
- 检查点(Checkpoint):定期将内存状态快照到持久化后端
- 预写日志(WAL):每个变更在应用前先写入 WAL
- 恢复(Recovery):重启时从最新检查点恢复,回放 WAL 中的增量变更
# 配置持久化参数
pw.run(
persistent_storage="./pathway_state",
persistence_mode="speed", # "speed" 或 "recovery"
# "speed": 优先性能,较少检查点
# "recovery": 优先恢复能力,更频繁检查点
)
4.5 一致性保证
Pathway 对一致性的分层设计:
| 版本 | 一致性级别 | 实现方式 |
|---|---|---|
| 开源版 | At-Least-Once | WAL + 检查点恢复 |
| 企业版 | Exactly-Once | 两阶段提交 + 幂等写入 |
对于大多数 ETL 场景,At-Least-Once + 下游去重就够了:
# 下游去重:利用 primary_key 天然去重
class DedupedSchema(pw.Schema):
tx_id: str = pw.column_definition(primary_key=True) # 主键去重
amount: float
timestamp: pw.DateTimeNaive
deduped = pw.io.kafka.read(
bootstrap_servers="localhost:9092",
topic="transactions",
schema=DedupedSchema,
# primary_key 保证同一 tx_id 只保留最新值
)
五、实战:从零构建实时交易风控管道
现在把上面学到的概念串起来,构建一个生产级的实时交易风控管道。
5.1 场景描述
需求:
- 实时监听交易流(Kafka)
- 按用户统计最近 1 小时的交易总额
- 检测异常交易(单笔金额过大、短时高频、异地交易)
- 告警输出到 Webhook
- 正常统计数据写入 PostgreSQL
5.2 完整实现
import pathway as pw
from datetime import datetime
# ===== 数据模式定义 =====
class TransactionSchema(pw.Schema):
tx_id: str = pw.column_definition(primary_key=True)
user_id: int
amount: float
currency: str
merchant: str
city: str
timestamp: pw.DateTimeNaive
class UserProfileSchema(pw.Schema):
user_id: int = pw.column_definition(primary_key=True)
name: str
home_city: str
risk_level: str # low / medium / high
daily_limit: float
# ===== 数据源接入 =====
# 实时交易流
transactions = pw.io.kafka.read(
bootstrap_servers="kafka:9092",
topic="transactions",
schema=TransactionSchema,
autocommit_duration_ms=5000,
)
# 用户画像(低频更新的维度表)
users = pw.io.postgres.read(
connection_string="postgresql://user:pass@postgres:5432/risk_db",
table_name="user_profiles",
schema=UserProfileSchema,
)
# ===== 数据预处理 =====
# 1. 统一货币(简化版,实际用实时汇率 API)
normalized_tx = transactions.select(
tx_id=transactions.tx_id,
user_id=transactions.user_id,
amount_usd=pw.if_else(
transactions.currency == "EUR",
transactions.amount * 1.08,
pw.if_else(
transactions.currency == "GBP",
transactions.amount * 1.27,
transactions.amount, # 默认 USD
),
),
merchant=transactions.merchant,
city=transactions.city,
timestamp=transactions.timestamp,
)
# 2. 关联用户画像
enriched_tx = normalized_tx.join_left(
users,
normalized_tx.user_id == users.user_id,
).select(
tx_id=normalized_tx.tx_id,
user_id=normalized_tx.user_id,
amount_usd=normalized_tx.amount_usd,
merchant=normalized_tx.merchant,
city=normalized_tx.city,
timestamp=normalized_tx.timestamp,
home_city=users.home_city,
risk_level=users.risk_level,
daily_limit=users.daily_limit,
)
# ===== 风控规则引擎 =====
# 规则1:单笔大额交易
large_amount_alert = enriched_tx.filter(
enriched_tx.amount_usd > enriched_tx.daily_limit * 0.8 # 单笔超过日限额 80%
).select(
alert_type="LARGE_AMOUNT",
tx_id=enriched_tx.tx_id,
user_id=enriched_tx.user_id,
amount=enriched_tx.amount_usd,
limit=enriched_tx.daily_limit,
timestamp=enriched_tx.timestamp,
)
# 规则2:短时高频交易(1 小时内超过 10 笔)
tx_count_window = enriched_tx.windowby(
enriched_tx.timestamp,
window=pw.temporal.tumbling(duration=pw.Duration.hours(1)),
).reduce(
user_id=enriched_tx.user_id,
window_start=pw.this._pw_window_start,
tx_count=pw.reducers.count(),
total_amount=pw.reducers.sum(enriched_tx.amount_usd),
)
high_freq_alert = tx_count_window.filter(
tx_count_window.tx_count > 10
).select(
alert_type="HIGH_FREQUENCY",
user_id=tx_count_window.user_id,
tx_count=tx_count_window.tx_count,
total_amount=tx_count_window.total_amount,
window_start=tx_count_window.window_start,
)
# 规则3:异地交易(交易城市 ≠ 常驻城市)
remote_city_alert = enriched_tx.filter(
enriched_tx.city != enriched_tx.home_city
).select(
alert_type="REMOTE_CITY",
tx_id=enriched_tx.tx_id,
user_id=enriched_tx.user_id,
tx_city=enriched_tx.city,
home_city=enriched_tx.home_city,
amount=enriched_tx.amount_usd,
timestamp=enriched_tx.timestamp,
)
# ===== 输出 =====
# 告警推送到 Webhook
pw.io.jsonlines.write(
large_amount_alert,
"./alerts/large_amount.jsonl",
)
pw.io.jsonlines.write(
high_freq_alert,
"./alerts/high_freq.jsonl",
)
pw.io.jsonlines.write(
remote_city_alert,
"./alerts/remote_city.jsonl",
)
# 正常统计数据写入 PostgreSQL
user_hourly_stats = enriched_tx.windowby(
enriched_tx.timestamp,
window=pw.temporal.tumbling(duration=pw.Duration.hours(1)),
).reduce(
user_id=enriched_tx.user_id,
window_start=pw.this._pw_window_start,
tx_count=pw.reducers.count(),
total_amount=pw.reducers.sum(enriched_tx.amount_usd),
avg_amount=pw.reducers.avg(enriched_tx.amount_usd),
max_amount=pw.reducers.max(enriched_tx.amount_usd),
)
pw.io.postgres.write(
user_hourly_stats,
connection_string="postgresql://user:pass@postgres:5432/risk_db",
table_name="user_hourly_stats",
)
# ===== 启动引擎 =====
pw.run(
persistent_storage="./pathway_state",
persistence_mode="speed",
)
5.3 架构亮点分析
这个管道的几个设计亮点:
1. 声明式规则引擎:每条风控规则都是一个独立的 Table 变换,新增规则只需加一段 filter + select,不影响其他规则。
2. 增量计算的性能优势:用户画像表(users)更新时,只重算受影响的 join 结果。传统方案需要全量重刷。
3. 流批一体的开发体验:开发时用 CSV 文件调试,生产时改一行代码切 Kafka。
4. 自动容错:persistent_storage 保证宕机后从检查点恢复,不丢数据。
六、RAG 管道:Pathway 的 AI 杀手级场景
如果说实时 ETL 是 Pathway 的基本功,那 RAG(检索增强生成) 就是它的杀手级场景。Pathway 内建了从文档摄入、向量索引到 LLM 查询的完整 RAG 管道。
6.1 为什么 Pathway 适合做 RAG?
传统 RAG 系统的痛点:
- 数据陈旧:向量索引是静态的,文档更新后索引不会自动更新
- 管道复杂:文档加载 → 切分 → Embedding → 向量存储 → 检索,5 个步骤 5 个组件
- 运维困难:每个组件独立部署,排查问题要跨 5 个系统
Pathway 的方案:一个 Python 脚本搞定所有。
import pathway as pw
from pathway.xpacks.llm import parsers, splitters, embedders
from pathway.xpacks.llm.vector_store import VectorStoreServer
# ===== 1. 文档摄入 =====
# 支持多种数据源:Google Drive、SharePoint、本地文件、S3 等
# Google Drive 文档
gdrive_docs = pw.io.gdrive.read(
object_id="1A2B3C4D5E6F", # Google Drive 文件夹 ID
service_user_credentials_file="./credentials.json",
)
# 本地文件
local_docs = pw.io.fs.read(
"./documents/",
format="binary",
)
# SharePoint 文档
sharepoint_docs = pw.io.sharepoint.read(
url="https://company.sharepoint.com/sites/docs",
client_id="...",
client_secret="...",
tenant_id="...",
)
# 合并所有文档源
all_docs = pw.combine(gdrive_docs, local_docs, sharepoint_docs)
# ===== 2. 文档解析与切分 =====
# 解析:PDF、Word、HTML、Markdown 自动识别
parsed = pw.apply(parsers.parse, all_docs.data)
# 切分:Token 计数切分器,确保每个块不超过 LLM 上下文
split = pw.apply(
lambda text: splitters.TokenCountSplitter(
max_tokens=500,
overlap_tokens=50, # 块之间 50 token 重叠
)(text),
parsed,
)
# ===== 3. 向量索引 =====
# 使用 OpenAI Embedding
embedder = embedders.OpenAIEmbedder(
model="text-embedding-3-small",
api_key="sk-...",
)
# 构建向量存储服务器
vector_server = VectorStoreServer(
data=split,
embedder=embedder,
search_top_k=5, # 检索 top-5 相关文档
)
# 启动 REST API 服务
vector_server.serve(
host="0.0.0.0",
port=8000,
with_cache=True, # 内置缓存层
cache_strategy=pw.upatterns.TemporalJoin(
cutoff=pw.Duration.minutes(5), # 5 分钟缓存
),
)
# ===== 4. 自动更新 =====
# 当 Google Drive / SharePoint / 本地文件有更新时,
# Pathway 自动重新解析、切分、Embedding,并更新向量索引
# 不需要手动触发重新索引!
pw.run()
6.2 向量索引的差分更新
这是 Pathway RAG 最核心的竞争力。传统方案(如 LangChain + Pinecone)的向量索引是 追加式 的——文档更新后,你需要手动删除旧向量、插入新向量。Pathway 的差分数据流让这个过程 完全自动化:
- 文档更新 → 连接器发出
(doc_id, time, diff=-1)撤回旧版本 - 差分传播 → 向量索引自动移除旧 Embedding
- 重新解析 → 发出
(doc_id, time, diff=+1)插入新版本 - 新 Embedding → 向量索引自动添加
整个过程无需人工干预,延迟取决于 Embedding API 的响应时间。
6.3 与 LangChain 集成
Pathway 提供了 LangChain 集成,让你可以直接在 LangChain 应用中使用 Pathway 的实时向量索引:
from langchain_community.vectorstores import PathwayVectorClient
# 连接到 Pathway 向量服务器
client = PathwayVectorClient(url="http://localhost:8000")
# 检索相关文档
results = client.similarity_search(
"如何配置 Kafka 连接器?",
k=5,
)
# 在 LangChain Chain 中使用
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=client.as_retriever(search_kwargs={"k": 5}),
)
# 每次查询都拿到最新的文档内容
# 因为 Pathway 的向量索引是实时更新的
answer = qa_chain.run("如何配置 Kafka 连接器?")
七、性能优化实战
7.1 内存优化
Pathway 默认把所有状态保存在内存中。对于大规模数据流,内存管理至关重要。
# 1. 控制 Table 的保留时间
# 对于时间序列数据,可以设置 TTL 自动清理过期数据
events_with_ttl = events.filter(
pw.this.timestamp > pw.now() - pw.Duration.hours(24)
)
# 2. 减少不必要的数据携带
# 只 select 需要的列,减少内存占用
lean_events = events.select(
user_id=events.user_id,
amount=events.amount,
# 不带 event_type、merchant 等不需要的列
)
# 3. 合理使用 groupby + reduce
# 避免大粒度的 groupby,尽量缩小分组键的范围
7.2 并行度调优
# Pathway 支持多线程和多进程执行
pw.run(
# 多线程:适用于 IO 密集型任务
# 引擎自动利用所有 CPU 核心
# 持久化配置
persistent_storage="./pathway_state",
# 检查点间隔(秒)
# 默认 60s,低延迟场景可以减小
checkpoint_interval_ms=30000, # 30 秒
)
对于分布式部署,Pathway 支持 Kubernetes 部署:
# pathway-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: pathway-etl
spec:
replicas: 3
selector:
matchLabels:
app: pathway-etl
template:
metadata:
labels:
app: pathway-etl
spec:
containers:
- name: pathway
image: pathwaycom/pathway:latest
command: ["python", "/app/pipeline.py"]
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
volumeMounts:
- name: state
mountPath: /pathway_state
- name: config
mountPath: /app
volumes:
- name: state
persistentVolumeClaim:
claimName: pathway-state-pvc
- name: config
configMap:
name: pathway-config
7.3 与 Flink 的性能对比
一个简单的 Word Count 基准测试(1000 万条数据,1KB/条):
| 指标 | PyFlink | Pathway |
|---|---|---|
| 吞吐量 | ~50K events/s | ~200K events/s |
| 端到端延迟(p99) | ~500ms | ~150ms |
| 内存占用 | 4GB | 2GB |
| 代码行数 | ~80 行 | ~20 行 |
注意:这个基准测试是 单机场景 下的结果。在多节点集群场景下,Flink 的扩展性更好。选择哪个框架取决于你的场景:
- 单机 / 中小规模集群 → Pathway(开发效率高,性能足够)
- 超大规模集群 / 复杂 Exactly-Once 语义 → Flink(生态成熟,稳定性验证充分)
7.4 UDF 性能优化
Pathway 的 UDF(用户自定义函数)有两种执行模式:
# 1. Python UDF —— 逐行调用,性能较低
@pw.udf
def slow_embed(text: str) -> list:
import openai
response = openai.Embedding.create(
input=text,
model="text-embedding-3-small",
)
return response.data[0].embedding
# 使用
embedded = documents.select(
text=documents.text,
embedding=slow_embed(documents.text),
)
# 2. 批量 UDF —— 批量调用,性能更高
@pw.udf(batch_size=32) # 每批 32 条
def fast_embed(texts: list[str]) -> list[list]:
import openai
response = openai.Embedding.create(
input=texts,
model="text-embedding-3-small",
)
return [item.embedding for item in response.data]
# 使用
embedded = documents.select(
text=documents.text,
embedding=fast_embed(documents.text),
)
批量 UDF 的性能提升主要来自:
- 减少函数调用开销(32 次合 1 次)
- API 调用的批量优化(OpenAI Embedding API 支持批量输入)
- Rust 引擎的批量调度优化
八、生产部署最佳实践
8.1 Docker 部署
FROM pathwaycom/pathway:latest
WORKDIR /app
COPY pipeline.py .
# 持久化状态目录
VOLUME /pathway_state
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s \
CMD curl -f http://localhost:8000/health || exit 1
CMD ["python", "pipeline.py"]
# 构建并运行
docker build -t pathway-etl .
docker run -d \
--name pathway-etl \
-p 8000:8000 \
-v pathway_state:/pathway_state \
-e KAFKA_BOOTSTRAP_SERVERS=kafka:9092 \
-e POSTGRES_CONNECTION_STRING=postgresql://user:pass@postgres:5432/db \
pathway-etl
8.2 监控
Pathway 内置了监控面板,跟踪每个连接器的吞吐量和延迟:
# 启用监控面板
pw.run(
monitoring_level=pw.MonitoringLevel.ALL,
# 可选: NONE, STATS, ALL
)
同时支持 Prometheus 指标输出:
# Prometheus 集成
pw.io.prometheus.export(
port=9090,
metrics_prefix="pathway_",
)
8.3 常见陷阱
陷阱1:Table 的不可变性
Pathway 的 Table 是不可变的——你不能修改已经创建的 Table,只能创建新的:
# ❌ 错误:Table 是不可变的
events.add_column(new_col=events.amount * 2) # 不支持
# ✅ 正确:创建新的 Table
enriched = events.select(
**events,
new_col=events.amount * 2,
)
陷阱2:全局排序的性能问题
# ❌ 危险:全局排序需要维护完整状态
sorted_all = events.sort(events.timestamp)
# ✅ 正确:窗口内排序
windowed_sorted = events.windowby(
events.timestamp,
window=pw.temporal.tumbling(duration=pw.Duration.minutes(5)),
).reduce(
sorted_data=pw.reducers.sorted_tuples(events.amount),
)
陷阱3:Join 的状态膨胀
# ❌ 危险:无时间约束的 Join 会维护两端完整状态
result = large_table.join(another_large_table, ...)
# ✅ 正确:先窗口聚合再 Join
hourly_left = large_table.windowby(
large_table.timestamp,
window=pw.temporal.tumbling(duration=pw.Duration.hours(1)),
).reduce(
key=pw.this.key,
value=pw.reducers.sum(large_table.value),
)
hourly_right = another_large_table.windowby(
another_large_table.timestamp,
window=pw.temporal.tumbling(duration=pw.Duration.hours(1)),
).reduce(
key=pw.this.key,
value=pw.reducers.sum(another_large_table.value),
)
result = hourly_left.join(hourly_right, hourly_left.key == hourly_right.key)
陷阱4:UDF 的副作用
# ❌ 危险:UDF 中不要有副作用
@pw.udf
def bad_udf(value: float) -> float:
requests.post("http://api/notify", json={"value": value}) # 副作用!
return value * 2
# Pathway 可能因为增量计算而多次调用 UDF(撤回+重发),
# 导致副作用被执行多次
# ✅ 正确:将副作用输出到 Connector
result = events.select(processed=events.value * 2)
pw.io.jsonlines.write(result, "./output.jsonl")
九、Pathway 的局限与适用场景
客观说,Pathway 不是银弹。以下是它的局限:
| 局限 | 说明 | 替代方案 |
|---|---|---|
| 超大规模集群 | 单集群建议 < 10 节点,不如 Flink 成熟 | Flink / Spark |
| Exactly-Once 语义 | 开源版只有 At-Least-Once | Flink / Kafka Streams |
| SQL 支持 | 不支持 SQL,只能用 Python API | Flink SQL / ksQLDB |
| 生态成熟度 | 2023 年才 1.0,社区资源较少 | Flink / Spark |
| 调试工具 | 缺少像 Flink Web UI 那样的成熟监控 | 自建 Grafana 面板 |
Pathway 的最佳适用场景:
- 中小规模实时 ETL:QPS < 100K,节点 < 10,延迟要求秒级
- RAG 管道:需要实时更新向量索引的知识库系统
- 实时监控告警:业务指标实时聚合 + 阈值告警
- 数据同步管道:多数据源 → 多目标的实时同步
- AI Feature Store:实时特征计算和在线推理
不适合的场景:
- 每日数据量 PB 级别的超大规模数据处理
- 需要严格 Exactly-Once 语义的金融交易系统
- 团队只有 Java/Scala 工程师,没有 Python 经验
十、总结与展望
Pathway 的核心价值主张很清晰:让 Python 开发者用写批处理的体验,获得流处理的实时能力。
它的技术密码是 差分数据流——这个从学术界走出来的计算模型,经过 Rust 引擎的工程化实现,变成了一个对 Python 开发者友好的流处理框架。
从 ETL 到 RAG,从实时监控到特征工程,Pathway 的适用范围正在快速扩展。2026 年它已经获得了 55K+ Star,社区活跃度持续上升。
但也要清醒地认识到:在大规模生产环境中,Flink 和 Spark 仍然是更稳妥的选择。Pathway 的定位是 "Python 开发者的第一个流处理框架"——它不是要取代 Flink,而是让更多 Python 开发者能够低门槛地进入流处理的世界。
我的判断:在 AI 时代,数据管道的实时性不再是"nice to have",而是"must have"。Pathway 以 Python 优先 + Rust 性能 + 内建 AI 能力的组合,有潜力成为 AI 时代的数据管道基础设施。
本文涉及的完整代码示例可在 GitHub 仓库 pathwaycom/pathway 中找到。Pathway 文档:https://pathway.com/developers/