Row 1: [id:1][name:"Alice"][age:30][city:"Beijing"][amount:1000][status:"active"][created_at:...][...200 more columns] Row 2: [id:2][name:"Bob"][age:25][city:"Shanghai"][amount:2000][status:"pending"][created_at:...][...200 more columns] 当执行SELECT amount, status FROM orders WHERE city = 'Beijing'时,行式存储必须读取整行数据,即使只需要2个字段。 列式存储(ClickHouse)的磁盘布局: id列: [1][2][3][4][5]... name列: ["Alice"]["Bob"]["Charlie"]... age列: [30][25][35][28]... city列: ["Beijing"]["Shanghai"]["Beijing"]... amount列: [1000][2000][1500][3000]... status列: ["active"]["pending"]["active"]... 同样的查询,列式存储只需读取city、amount、status三列。假设表有200列,I/O量降至行式存储的1.5%。 这不是优化,这是数量级的差异。 ### 2.2 向量化执行引擎:SIMD指令集的极致利用 ClickHouse的查询执行采用向量化(Vectorized)模式,一次处理一批数据而非逐行处理: cpp // 伪代码:传统逐行执行 for (size_t i = 0; i < n; i++) { result[i] = a[i] + b[i]; // 每次循环1个元素 } // 向量化执行:一次处理1024个元素 for (size_t i = 0; i < n; i += 1024) { __m256i vec_a = _mm256_loadu_si256((__m256i*)&a[i]); __m256i vec_b = _mm256_loadu_si256((__m256i*)&b[i]); __m256i vec_result = _mm256_add_epi32(vec_a, vec_b); _mm256_storeu_si256((__m256i*)&result[i], vec_result); } 通过SIMD(Single Instruction Multiple Data)指令集,ClickHouse可以在单个CPU周期内并行处理多个数据元素。实测数据显示,向量化执行相比传统逐行执行可带来5-10倍的性能提升。 ### 2.3 MergeTree引擎:写入即排序,查询即剪枝 ClickHouse的核心表引擎MergeTree采用了一种独特的设计哲学: sql -- 创建MergeTree表 CREATE TABLE user_events ( event_time DateTime, user_id UInt64, event_type String, properties String ) ENGINE = MergeTree() PARTITION BY toYYYYMM(event_time) -- 按月分区 ORDER BY (user_id, event_time) -- 排序键 PRIMARY KEY user_id; -- 主键 数据写入流程: 1. 数据首先写入内存缓冲区 2. 当缓冲区满或达到时间阈值,数据被刷写到磁盘形成一个数据片段(Data Part) 3. 每个数据片段内部,数据严格按照ORDER BY键排序 4. 后台任务异步合并小片段为大片段(Merge) 查询优化原理: sql -- 这个查询可以高效执行 SELECT * FROM user_events WHERE user_id = 12345 AND event_time BETWEEN '2026-01-01' AND '2026-01-31'; 1. 分区剪枝:根据event_time范围,只扫描2026年1月的分区 2. 主键索引:利用user_id的有序性,通过二分查找快速定位数据范围 3. 数据跳跃:每个数据片段存储了每列的min/max值,可以快速跳过不匹配的片段 ### 2.4 极致压缩:同类型数据的天然优势 列式存储的另一个隐藏收益是极高的压缩率。同一列的数据类型相同、值域相近,压缩算法可以发挥最大效能: sql -- 查看表的压缩情况 SELECT column, formatReadableSize(data_compressed_bytes) AS compressed, formatReadableSize(data_uncompressed_bytes) AS uncompressed, round(data_uncompressed_bytes / data_compressed_bytes, 2) AS ratio FROM system.parts_columns WHERE table = 'user_events' AND active ORDER BY data_uncompressed_bytes DESC; 典型生产环境中,ClickHouse的压缩比可达5-20倍。这意味着: - 更少的磁盘I/O - 更高的缓存命中率 - 更低的存储成本 ## 三、收购Langfuse:进军LLM可观测性的战略卡位 ### 3.1 什么是LLM可观测性 传统可观测性(Observability)关注三个维度: - Metrics:CPU、内存、QPS等系统指标 - Logs:应用日志 - Traces:分布式调用链 LLM可观测性(LLM Observability)关注的是AI系统特有的问题: - Prompt质量:输入提示词是否清晰、完整 - 输出质量:模型响应是否准确、安全、符合预期 - Token消耗:成本控制和优化 - RAG效果:检索增强生成的上下文质量 - Agent行为:多步骤AI代理的决策过程 ### 3.2 Langfuse的核心能力 Langfuse是增长最快的开源LLM工程平台之一: - 20,470个GitHub Star - 每月超过2,600万次SDK安装 - 超过600万次Docker拉取 - 《财富》50强中19家、《财富》500强中63家企业在使用 核心功能架构: python from langfuse import Langfuse langfuse = Langfuse( public_key="pk-lf-...", secret_key="sk-lf-...", host="https://cloud.langfuse.com" ) # 追踪LLM调用 trace = langfuse.trace(name="customer-support-bot") # 记录生成步骤 generation = trace.generation( name="response-generation", model="gpt-4", prompt="用户问:如何重置密码?", completion="您可以点击登录页面的'忘记密码'链接...", usage={"input": 15, "output": 45, "unit": "TOKENS"} ) # 记录RAG检索 retrieval = trace.span(name="knowledge-retrieval") retrieval.event(name="vector-search", metadata={"docs_retrieved": 5}) ### 3.3 ClickHouse + Langfuse的协同效应 Langfuse联合创始人Marc Klingen的表态点明了这次收购的核心逻辑: > "我们之所以在ClickHouse之上构建Langfuse,是因为LLM可观测性和评估本质上就是一个数据问题。" LLM可观测性产生的数据特征: - 高写入吞吐:每次LLM调用产生多条追踪记录 - 复杂查询模式:按会话、按用户、按时间窗口的多维分析 - 实时性要求:生产环境需要秒级延迟的监控能力 这正是ClickHouse的强项。收购后的一体化体验将包括: - 更快的数据摄取:原生集成消除数据搬运开销 - 更深入的评估能力:利用ClickHouse的复杂分析能力 - 更短的反馈闭环:从生产问题到可量化改进的快速迭代 ## 四、原生Postgres服务:统一事务型与分析型工作负载 ### 4.1 HTAP的终极形态 现代AI应用对数据库的需求往往是双模态的: - 事务处理(OLTP):用户注册、订单创建、状态更新——需要ACID保证、低延迟写入 - 分析查询(OLAP):用户行为分析、业务报表、AI特征工程——需要复杂查询、高吞吐扫描 传统方案是双库架构: ┌─────────────┐ CDC/ETL ┌─────────────┐ │ PostgreSQL │ ─────────────► │ ClickHouse │ │ (OLTP) │ (延迟分钟级) │ (OLAP) │ └─────────────┘ └─────────────┘ 这种架构的问题: - 数据同步延迟 - 运维复杂度高 - 一致性难以保证 - 成本叠加 ### 4.2 ClickHouse的统一数据技术栈 ClickHouse推出的原生Postgres服务实现了真正的HTAP(Hybrid Transactional/Analytical Processing): ┌─────────────────────────────────────────────────┐ │ ClickHouse Cloud │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Postgres │ ◄─────► │ ClickHouse │ │ │ │ (OLTP) │ 原生CDC │ (OLAP) │ │ │ │ NVMe存储 │ (毫秒级) │ 列式存储 │ │ │ └─────────────┘ └─────────────┘ │ │ │ │ │ │ └────── 统一查询层 ──────┘ │ └─────────────────────────────────────────────────┘ 技术亮点: 1. NVMe存储支撑:Postgres实例使用高性能NVMe SSD,确保OLTP性能 2. 原生CDC能力:事务数据变更实时同步到ClickHouse,延迟降至毫秒级 3. 统一查询层:通过Postgres扩展,可以直接在Postgres中查询ClickHouse数据 sql -- 在Postgres中创建ClickHouse外部表 CREATE EXTENSION IF NOT EXISTS clickhouse_fdw; CREATE SERVER clickhouse_server FOREIGN DATA WRAPPER clickhouse_fdw OPTIONS (host 'clickhouse.cloud', port '8443', database 'analytics'); -- 创建外部表映射 CREATE FOREIGN TABLE user_events_ch ( event_time timestamp, user_id bigint, event_type text ) SERVER clickhouse_server OPTIONS (table 'user_events'); -- 统一查询:JOIN Postgres事务表和ClickHouse分析表 SELECT u.username, COUNT(e.event_type) as event_count, SUM(e.amount) as total_amount FROM users u -- Postgres本地表 JOIN user_events_ch e ON u.id = e.user_id -- ClickHouse外部表 WHERE u.created_at > '2026-01-01' GROUP BY u.username; ### 4.3 与Ubicloud的合作 ClickHouse的Postgres服务由Ubicloud联合打造。Ubicloud团队来自Citus Data(被Microsoft收购)、Heroku和Microsoft,在分布式Postgres领域有深厚积累。 Ubicloud联合创始人Umur Cubukcu的评价: > "Postgres与ClickHouse在架构上天然互补,是AI应用不可或缺的组成部分。我们为团队交付了一套真正的一体化技术栈。" ## 五、实战代码:构建一个AI应用的数据基础设施 ### 5.1 场景设定 假设我们正在构建一个AI客服系统,需要: 1. 存储用户对话(高并发写入) 2. 实时分析对话质量(复杂查询) 3. 追踪LLM调用(可观测性) ### 5.2 数据模型设计 sql -- 1. Postgres:事务型数据 CREATE TABLE conversations ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL, session_id UUID NOT NULL, status VARCHAR(20) DEFAULT 'active', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE messages ( id BIGSERIAL PRIMARY KEY, conversation_id BIGINT REFERENCES conversations(id), role VARCHAR(20) NOT NULL, -- 'user', 'assistant', 'system' content TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 2. ClickHouse:分析型数据 CREATE TABLE conversation_analytics ( event_time DateTime64(3), conversation_id UInt64, user_id UInt64, session_id UUID, event_type Enum('message_sent', 'llm_request', 'llm_response', 'feedback_given'), message_length UInt32, response_time_ms UInt32, token_input UInt32, token_output UInt32, sentiment_score Float32, properties String -- JSON格式存储额外属性 ) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(event_time) ORDER BY (user_id, event_time) TTL event_time + INTERVAL 90 DAY; -- 自动清理90天前的数据 -- 3. 物化视图:预聚合统计 CREATE MATERIALIZED VIEW conversation_stats_mv ENGINE = SummingMergeTree() PARTITION BY toYYYYMMDD(hour) ORDER BY (user_id, hour) AS SELECT toStartOfHour(event_time) AS hour, user_id, count() AS message_count, sum(token_input) AS total_tokens_in, sum(token_output) AS total_tokens_out, avg(response_time_ms) AS avg_response_time FROM conversation_analytics GROUP BY hour, user_id; ### 5.3 应用层集成 python import asyncio import asyncpg from clickhouse_driver import Client as CHClient from langfuse import Langfuse import openai class AICustomerService: def __init__(self): self.pg_pool = None self.ch_client = CHClient(host='clickhouse.cloud', port=8443) self.langfuse = Langfuse() async def initialize(self): self.pg_pool = await asyncpg.create_pool( 'postgresql://user:pass@postgres.cloud/db' ) async def handle_message(self, user_id: int, message: str): # 1. 创建追踪 trace = self.langfuse.trace( name="customer-service-interaction", user_id=str(user_id), metadata={"channel": "web_chat"} ) # 2. 保存到Postgres(事务型) async with self.pg_pool.acquire() as conn: async with conn.transaction(): # 获取或创建会话 conv_id = await conn.fetchval( """INSERT INTO conversations (user_id, session_id) VALUES ($1, gen_random_uuid()) ON CONFLICT DO NOTHING RETURNING id""", user_id ) # 保存用户消息 await conn.execute( "INSERT INTO messages (conversation_id, role, content) VALUES ($1, $2, $3)", conv_id, 'user', message ) # 3. 调用LLM(带追踪) generation = trace.generation( name="llm-response", model="gpt-4", prompt=message ) start_time = asyncio.get_event_loop().time() response = await openai.ChatCompletion.acreate( model="gpt-4", messages=[{"role": "user", "content": message}] ) response_time_ms = int((asyncio.get_event_loop().time() - start_time) * 1000) generation.end( completion=response.choices[0].message.content, usage={ "input": response.usage.prompt_tokens, "output": response.usage.completion_tokens } ) # 4. 写入ClickHouse(分析型) self.ch_client.execute( """INSERT INTO conversation_analytics (event_time, conversation_id, user_id, event_type, message_length, response_time_ms, token_input, token_output) VALUES""", [{ 'event_time': datetime.now(), 'conversation_id': conv_id, 'user_id': user_id, 'event_type': 'llm_response', 'message_length': len(response.choices[0].message.content), 'response_time_ms': response_time_ms, 'token_input': response.usage.prompt_tokens, 'token_output': response.usage.completion_tokens }] ) return response.choices[0].message.content async def get_user_analytics(self, user_id: int, days: int = 7): """实时分析查询""" result = self.ch_client.execute( """SELECT toDate(event_time) as date, count() as message_count, avg(response_time_ms) as avg_response_time, sum(token_input + token_output) as total_tokens, quantile(0.95)(response_time_ms) as p95_response_time FROM conversation_analytics WHERE user_id = %(user_id)s AND event_time > now() - INTERVAL %(days)s DAY GROUP BY date ORDER BY date""", {'user_id': user_id, 'days': days} ) return result ### 5.4 性能优化技巧 sql -- 1. 使用投影(Projection)优化特定查询 ALTER TABLE conversation_analytics ADD PROJECTION sentiment_analysis ( SELECT user_id, event_time, sentiment_score ORDER BY sentiment_score ); -- 2. 二级索引加速点查 ALTER TABLE conversation_analytics ADD INDEX idx_session session_id TYPE bloom_filter GRANULARITY 1; -- 3. 采样查询快速估算 SELECT avg(response_time_ms) FROM conversation_analytics SAMPLE 0.1 -- 只扫描10%的数据 WHERE event_time > today() - 7; ## 六、竞品对比:ClickHouse vs Snowflake vs Databricks | 维度 | ClickHouse | Snowflake | Databricks | |------|-----------|-----------|------------| | 核心优势 | 极致查询性能 | 弹性计算分离 | 统一数据+AI平台 | | 延迟 | 亚秒级 | 秒级-分钟级 | 分钟级 | | 成本模型 | 存储+计算绑定 | 按查询付费 | 按计算资源付费 | | 开源程度 | 完全开源 | 闭源 | 部分开源(Delta Lake) | | AI集成 | Langfuse收购后补强 | Cortex AI | 原生MLflow集成 | | Postgres兼容 | 原生服务 | 外部连接 | 外部连接 | | 最佳场景 | 实时分析、嵌入式分析 | 企业数据仓库 | 数据科学+机器学习 | ## 七、未来展望:ClickHouse的AI基础设施野心 ### 7.1 产品路线图预测 基于此次融资和收购动作,ClickHouse的产品演进方向可能包括: 1. LLM可观测性平台化 - 深度集成Langfuse,提供开箱即用的AI应用监控 - 支持更多模型提供商(Anthropic、Google、开源模型) - 内置评估指标和A/B测试能力 2. AI特征平台 - 实时特征工程管道 - 与主流ML平台(MLflow、Weights & Biases)集成 - 在线/离线特征一致性保障 3. 向量数据库增强 - 原生向量索引支持 - 与Embedding模型直接集成 - RAG应用优化 ### 7.2 对开发者的启示 ClickHouse的转型给开发者传递了一个明确信号: > 在AI时代,数据基础设施的选择将直接影响AI应用的上限。 对于正在构建AI应用的团队,建议: 1. 评估现有数据架构:是否能支撑AI应用的高并发、低延迟需求 2. 关注可观测性建设:LLM可观测性不是可选项,而是生产必备 3. 简化技术栈:统一的事务+分析平台可以降低复杂度和成本 ## 八、总结 ClickHouse的4亿美元D轮融资和战略收购,标志着数据分析领域正在经历一场由AI驱动的范式转移。从OLAP数据库到AI数据基础设施,ClickHouse正在构建一个覆盖数据存储、实时分析、LLM可观测性的完整技术栈。 对于开发者而言,这意味着: - 更简单的架构:统一平台替代复杂的组件拼接 - 更高的性能:亚秒级查询延迟成为标配 - 更强的可观测性:AI应用的行为变得透明可控 正如ClickHouse CEO Aaron Katz所说: > "我们正在支持统一的事务型与分析型工作负载,让开发者能够在坚实的技术基础之上构建各种由AI驱动的应用。" 在AI基础设施的赛道上,ClickHouse已经亮出了自己的底牌。接下来的竞争,将围绕谁能更好地服务AI时代的开发者展开。 --- 参考链接: - ClickHouse官方公告 - Langfuse GitHub - ClickHouse Postgres服务 作者简介: 程序员茄子,专注云原生与AI基础设施,热爱折腾各种数据库和分布式系统。