LibSQL/Turso 深度解析:从边缘数据库到 AI 原生向量搜索——2026 年嵌入式 OLAP 的工业级完全指南
一、背景介绍:为什么 2026 年的数据库战场在边缘
过去十年,数据库领域经历了两次大的范式转移。第一次是从集中式商业数据库向开源 PostgreSQL/MySQL 迁移,第二次是云原生时代下 NoSQL 和 NewSQL 的崛起。而 2026 年,我们正在见证第三次——边缘数据库的全面崛起。
这场革命的驱动力来自三个看似不相关的技术趋势的汇合:
第一,AI 应用的爆发让向量搜索成为刚需。 RAG(检索增强生成)、语义搜索、推荐系统,这些 AI 原生场景的核心都是向量嵌入的高效检索。传统的解决方案是独立的向量数据库(Milvus、Pinecone、Weaviate),但这带来了架构复杂度和数据一致性的双重挑战。程序员们开始追问:能不能在现有的数据库里直接做向量搜索?
第二,全球化应用对延迟的极致追求。 用户分布在全球各地的移动应用,越来越无法承受 100-200ms 的数据库响应延迟。把数据库实例部署在用户最近的边缘节点,从"计算去中心化"延伸到"存储去中心化",成为下一代全球化应用的标准架构。Cloudflare D1、Neon、Turso 的崛起都是这个趋势的缩影。
第三,单体架构的回归与 SQLite 的重新定义。 有趣的是,在微服务热潮退去之后,很多团队发现 80% 的业务场景根本不需要分布式数据库的复杂度。SQLite 这种嵌入式、单文件、零运维的数据库,在 Jamstack、移动端、边缘计算、桌面应用等场景反而展现出惊人的生命力。问题是 SQLite 本身的能力边界太窄——不支持并发写入、缺乏向量搜索、无法水平扩展。
LibSQL(Turso) 就是在这种背景下诞生的。它站在 SQLite 的肩膀上,通过一系列工程创新,在保持嵌入式数据库简单性的同时,解锁了那些原本只有"重量级"数据库才有的能力:向量搜索、多写者并发、云原生同步和 Serverless 化。
这不是一个简单的 SQLite 分支,而是一次对嵌入式数据库能力边界的重新定义。
二、核心概念:LibSQL 的技术基石
理解 LibSQL,需要从它的四个核心技术特性说起。每一个特性背后都有深刻的工程权衡,理解这些权衡,才能真正用好这个工具。
2.1 SQLite 的 DNA:为什么 LibSQL 选择站在 SQLite 肩膀上
SQLite 是世界上部署最广泛的数据库。根据官方统计,SQLite 运行在数十亿台设备上——Android 和 iOS 的系统数据库、Chrome 和 Firefox 的内部存储、航空航天的飞行控制系统、医疗设备的核心数据层。SQLite 的成功不是偶然的,它代表了一种极致简洁的工程哲学:把数据库变成一个库,而不是一个服务。
传统数据库架构:
[应用] → [数据库驱动] → [数据库服务进程] → [磁盘文件]
SQLite 架构:
[应用] → [数据库驱动/库] → [磁盘文件]
这种架构带来的优势是惊人的:
- 零运维:没有独立的数据库服务进程需要启动、配置、调优、监控
- 零网络开销:数据库操作就是本地文件 I/O,没有 TCP 握手和协议解析开销
- 极致简单:SQLite 的使用体验就是打开文件、读写数据、关闭文件
- 可移植性:单文件数据库,任何支持文件系统的环境都能运行
但 SQLite 的 DNA 里也遗传了它的局限:
- 写入是串行的(database write lock),高并发写入场景是硬伤
- 不支持向量搜索,无法服务 AI 应用
- 无法远程访问(除非借助 WAL 的复制机制)
- ALTER TABLE 能力有限(历史遗留问题)
LibSQL 在 SQLite 的源码基础上做了深度改造,既保留了 SQLite 的极致简洁,又突破了这些能力边界。
2.2 Server Mode:从嵌入式到服务化
LibSQL 的第一个重大扩展是 Server Mode。传统 SQLite 是纯嵌入式的——数据库文件和应用程序在同一个进程空间。但现代应用架构中,有大量场景需要远程数据库访问:
- 前端应用需要访问数据库,但没有文件系统的读写权限(浏览器、Serverless 函数)
- 多个应用实例需要共享同一个数据库
- 需要通过 HTTP API 来控制数据库权限和访问策略
LibSQL 的 Server Mode 通过 HTTP 和 WebSocket 提供数据库访问能力。这看起来简单,但背后有一系列工程挑战:
挑战一:如何处理网络故障?
传统 SQLite 直接读写文件,操作是原子的。引入网络后,一次 "UPDATE ... WHERE id = ?" 可能中途网络断开,数据库处于不一致状态。LibSQL 的 Server Mode 需要处理这种情况。解决方案是:服务端维护一个 WAL(Write-Ahead Log)序列号,客户端在每次请求时携带自己最后已知的序列号,服务端据此判断是否需要重放缺失的操作。
挑战二:如何处理连接中断恢复?
LibSQL 支持增量同步。客户端可以请求 "从序列号 X 开始的所有变更",这意味着即使应用重启、从网络断开了一段时间,也能无缝恢复数据同步。这对于移动应用和 Serverless 函数尤其有价值——它们的状态随时可能被销毁和重建。
Server Mode 的典型使用场景:
// 前端应用(浏览器中的 JavaScript)直接连接远程 Turso 数据库
import { createClient } from "@libsql/client";
const client = createClient({
url: "libsql://my-db.turso.io",
authToken: "your-auth-token"
});
// 浏览器中的离线优先应用,通过 Turso 的 HTTP 接口访问数据库
const result = await client.execute("SELECT * FROM articles WHERE published = 1");
2.3 嵌入式副本(Embedded Replicas):边缘数据库的核心技术
如果说 Server Mode 是 LibSQL 的"远程化"能力,那么 Embedded Replicas 就是 LibSQL 的"边缘化"能力。这是 Turso 最有技术深度的特性之一。
什么是嵌入式副本?
传统数据库的主从复制是这样的:
主库(Primary) ← 网络复制 ← 副本(Replica)
应用写入 → 主库(处理写入)
应用读取 → 副本(只读扩展)
问题在于:主从复制需要网络通信。边缘节点的延迟本身就高,再加上网络复制的一致性延迟,在边缘部署副本的经济性和实时性都很差。
LibSQL 的嵌入式副本换了一种思路:把副本数据库文件直接嵌入到应用的部署包中。
主库(远程云端)
↓ 初始同步(一次性完整下载)
数据库文件副本(嵌入到应用/边缘节点)
↓ 本地 I/O
应用(边缘节点,无网络延迟)
应用第一次启动时,从主库完整下载数据库文件到本地。之后应用的读取操作完全在本地进行,零网络延迟。当主库有新数据写入时,LibSQL 通过增量同步机制(基于 WAL 的序列号机制)将变更推送到边缘副本。
增量同步的技术细节:
LibSQL 的嵌入式副本使用一种叫做 WAL 日志传播的机制。当主库执行写操作时,对应的 WAL 条目会被记录。客户端应用在空闲时(或者每次写入后)向主库请求 "给我所有序列号大于 X 的 WAL 条目",主库返回增量更新,客户端将 WAL 条目应用到本地副本数据库。
这带来了一些关键的技术特性:
- 冷启动成本高,但持续运行成本极低:首次同步需要下载完整的数据库文件(可能从几 MB 到几 GB),但之后的增量同步只需要传输 WAL 日志(通常只有 KB 级别)
- 读取完全本地化:边缘节点的所有读取操作都是本地 SQLite 文件访问,延迟可以降到亚毫秒级
- 写入仍然需要连接主库:这是架构上的有意识设计——保证写入的一致性权威在云端主库,边缘副本始终是最终一致(eventually consistent)的
典型的嵌入式副本应用场景:
// 边缘节点上的应用
import { createClient } from "@libsql/client";
import { writeFileSync, readFileSync, existsSync } from "fs";
const LOCAL_DB_PATH = "./replica.db";
// 首次启动:从主库下载完整数据库文件
async function initializeReplicat() {
const remoteDb = createClient({
url: "libsql://production-db.turso.io",
authToken: process.env.TURSO_AUTH_TOKEN
});
const localDb = createClient({ url: `file:${LOCAL_DB_PATH}` });
// 获取主库的当前快照
const dump = await remoteDb.downloadDump();
writeFileSync(LOCAL_DB_PATH, dump);
// 之后增量同步逻辑...
}
2.4 向量搜索:SQLite 的 AI 进化
**向量搜索(Vector Search)**是 LibSQL 2026 年最令人兴奋的特性。在 RAG 场景中,我们需要将文档转换为向量嵌入,然后在向量空间中查找与查询向量最相似的文档。传统的解决方案需要独立的向量数据库,而 LibSQL 通过扩展直接把这个能力带到了 SQLite 里。
LibSQL 向量搜索的工作原理:
LibSQL 使用一种叫做 **IVF(Inverted File Index)**的索引结构来加速向量搜索。IVF 的核心思想是将向量空间划分为多个聚类(cluster),查询时只搜索最近的几个聚类,而不是遍历整个向量空间。
-- 创建向量表
CREATE TABLE document_chunks (
id INTEGER PRIMARY KEY,
chunk_text TEXT,
embedding FLOAT[1536] -- OpenAI text-embedding-3-small 的维度
);
-- 创建向量索引(HNSW 算法)
CREATE VECTOR INDEX document_chunks_embedding_idx
ON document_chunks USING hnsw(embedding);
-- 插入带嵌入向量的文档
INSERT INTO document_chunks (chunk_text, embedding)
VALUES ('LibSQL 是 SQLite 的云原生扩展',
'[0.123, -0.456, 0.789, ...]'); -- 1536 维向量
-- 向量相似度搜索(KNN 查询)
SELECT chunk_text,
distance(embedding, '[0.111, -0.444, 0.777, ...]') as dist
FROM document_chunks
ORDER BY dist ASC
LIMIT 5;
LibSQL 支持两种主流的向量相似度度量:
欧氏距离(L2):最直观的距离度量,衡量向量在多维空间中的直线距离。
$$d_{L2}(\vec{a}, \vec{b}) = \sqrt{\sum_{i=1}^{n}(a_i - b_i)^2}$$
余弦相似度(Cosine):衡量两个向量的方向相似性,取值范围 [-1, 1],1 表示完全相同方向。
$$d_{cos}(\vec{a}, \vec{b}) = \frac{\vec{a} \cdot \vec{b}}{|\vec{a}| \cdot |\vec{b}|} = \frac{\sum_{i=1}^{n} a_i b_i}{\sqrt{\sum_{i=1}^{n} a_i^2} \cdot \sqrt{\sum_{i=1}^{n} b_i^2}}$$
**HNSW(Hierarchical Navigable Small World)**是 LibSQL 使用的核心向量索引算法。它是一种基于图的近似最近邻搜索算法,通过构建多层图结构实现对数级别的搜索复杂度。HNSW 的核心思想来自"小世界网络"理论——在社交网络中,任意两个人之间通常只需要很少的几步就能连接起来。
HNSW 分层结构:
Layer 3 (顶层): ○ —— ○ —— ○ ← 粗粒度搜索,快速定位大致区域
↘ ↗
Layer 2: ○ —— ○ —— ○ —— ○ ← 中间层
↘ ↗ ↘
Layer 1: ○ —— ○ —— ○ —— ○ —— ○ ← 精细搜索
↘ ↗ ↘ ↗
Layer 0: ○ —— ○ —— ○ —— ○ —— ○ —— ○ —— ○ ← 底层边,精确最近邻
搜索过程:从顶层开始,按贪婪策略向下搜索,直到找到最近邻
HNSW 的参数调优是一个艺术:
- M(每层最大连接数):控制图的连通性,M 越大搜索精度越高,但索引构建和内存占用也越大
- efConstruction(构建时的动态候选列表大小):控制索引构建质量,通常设为 M 的 1-2 倍
- efSearch(搜索时的动态候选列表大小):控制搜索精度与速度的权衡,efSearch 越大精度越高但越慢
2.5 多写者并发:打破 SQLite 的写锁魔咒
SQLite 最大的痛点之一是写入并发。在默认配置下,SQLite 同一时间只允许一个写入操作。其他的写入请求需要排队等待。当你的应用有多个并发写入源(比如多个 Serverless 函数实例)时,这个限制就变成了性能瓶颈。
LibSQL 通过 BEGIN CONCURRENT 事务扩展解决了这个问题:
-- SQLite 传统写入(串行)
BEGIN;
INSERT INTO articles (title, content) VALUES ('标题', '内容');
COMMIT;
-- LibSQL 并发写入
BEGIN CONCURRENT; -- 声明这是一个并发安全的写入
INSERT INTO articles (title, content) VALUES ('标题', '内容');
COMMIT; -- 提交时如果检测到与其他并发的写入冲突,自动回滚重试
BEGIN CONCURRENT 的工作原理(MVCC):
LibSQL 使用 Multi-Version Concurrency Control(MVCC)来实现并发写入。每个写事务都有一个独立的版本快照:
时间轴:
T1: BEGIN CONCURRENT → 读取当前快照(版本 1)→ 写入 A=1
T2: BEGIN CONCURRENT → 读取当前快照(版本 1)→ 写入 B=2
T3: BEGIN CONCURRENT → 读取当前快照(版本 1)→ 写入 C=3
T1: COMMIT → 尝试获取写锁 → 成功 → 数据库变为版本 2(A=1)
T2: COMMIT → 尝试获取写锁 → 失败(与 T1 冲突)→ 自动回滚 → 重试 → COMMIT 成功 → 版本 3(A=1, B=2)
T3: COMMIT → 尝试获取写锁 → 失败(与 T2 冲突)→ 自动回滚 → 重试 → ...
关键是 COMMIT 时的"乐观锁"机制:提交时不立即获取写锁,而是先记录自己的写入集合(Write Set)。如果其他已提交的事务修改了相同的行,则回滚当前事务并重试。这种"先写后检查"的乐观并发控制,在冲突不频繁的场景下非常高效。
注意:BEGIN CONCURRENT 只能处理行级并发冲突。如果两个事务同时修改同一个表的表结构(比如 ALTER TABLE),仍然会冲突。此外,BEGIN CONCURRENT 不支持跨数据库的写事务——你需要确保所有写入都在同一个数据库文件内。
2.6 Turso 的商业化架构:从开源 LibSQL 到全托管云服务
Turso 是 LibSQL 的商业化托管服务,由 SQLite 的创始人 D. Richard Hipp 所在的团队开发(准确地说,Turso 公司收购了 libSQL 项目并持续发展它)。
Turso 的产品架构体现了对边缘计算场景的深刻理解:
┌──────────────────────────────────────────────────────────┐
│ Turso 全球网络 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 美东节点 │ │ 欧中节点 │ │ 亚太节点 │ │ 日本节点 │ │
│ │ 纽约 │ │ 法兰克福 │ │ 新加坡 │ │ 东京 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ ┌────┴──────────────┴──────────────┴──────────────┴────┐ │
│ │ 主库(Primary Database) │ │
│ │ - 所有写入的权威来源 │ │
│ │ - WAL 日志生成 │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘
↑ 增量 WAL 同步
┌──────────────────────────────────────────────────────────┐
│ 边缘副本(Replicas) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 用户设备 │ │ CDN 边缘 │ │ 移动端 │ │ 桌面应用 │ │
│ │ (文件) │ │ (CDN) │ │ (SQLite) │ │ (本地) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└──────────────────────────────────────────────────────────┘
Turso 的定价模型非常有意思——它是按照存储量和读取量计费的,而不是计算实例:
- 免费套餐:9GB 存储,5 亿行读取/月
- 付费套餐:$4.95/月起,支持更多数据库和更大的存储
这种计费模式对于 Serverless 和边缘计算场景特别友好——你只为实际使用的数据量付费,不需要为永不启动的计算实例付费。
三、架构分析:LibSQL/Turso 的系统设计
3.1 整体架构:从库到平台的演进
LibSQL/Turso 的架构可以分为四个层次:
┌─────────────────────────────────────────────────────┐
│ 应用层(Application Layer) │
│ SDK: JavaScript, Python, Go, Rust, Swift, Java │
├─────────────────────────────────────────────────────┤
│ HTTP API 层(HTTP Interface) │
│ Turso HTTP API / LibSQL Server Mode │
├─────────────────────────────────────────────────────┤
│ 同步层(Sync Layer) │
│ WAL 传播 / 增量同步 / Embedded Replica │
├─────────────────────────────────────────────────────┤
│ 存储引擎层(Storage Engine) │
│ LibSQL 核心 / 向量索引 / MVCC 事务管理 │
└─────────────────────────────────────────────────────┘
存储引擎层是 LibSQL 的核心。它基于 SQLite 的 B+tree 存储引擎,但做了以下关键扩展:
- MVCC 事务管理:在 SQLite 的单 writer 锁机制基础上,通过版本快照实现了并发写入
- 向量索引:基于 Faiss 的 IVF-PQ 索引算法,实现了近似最近邻搜索
- WAL 增强:扩展了 WAL 格式以支持向量数据和增量同步
同步层是 LibSQL 与云端 Turso 服务之间的桥梁。它负责:
- 增量变更捕获:从 WAL 中提取增量变更
- 序列号管理:追踪同步进度
- 冲突解决:处理边缘副本与主库之间的潜在冲突
HTTP API 层提供了语言无关的数据库访问接口。LibSQL 定义了一个标准的 HTTP+SQL 协议,任何实现该协议的工具都能连接 Turso 数据库:
HTTP Request:
POST /v1/execute
Authorization: Bearer <token>
Content-Type: application/json
{
"statements": ["SELECT * FROM articles WHERE published = 1"]
}
HTTP Response:
{
"columns": ["id", "title", "content"],
"rows": [
[1, "LibSQL深度解析", "内容..."],
[2, "向量搜索实战", "内容..."]
],
"next_cursor": null
}
3.2 向量索引的内部实现:Faiss 与 SQLite 的融合
LibSQL 的向量搜索能力并不是从头写的,而是复用了 Facebook Research 的 **Faiss(Facebook AI Similarity Search)**库。Faiss 是业界最成熟的向量索引库,支持多种索引算法,在数十亿向量规模的生产环境中得到验证。
LibSQL 通过 SQLite 的虚拟表(Virtual Table)机制将 Faiss 集成到 SQLite 的查询引擎中:
-- LibSQL 的向量索引通过虚拟表实现
CREATE VIRTUAL TABLE article_vectors USING vec0(
embedding float[1536], -- 1536 维浮点向量
file_id integer, -- 关联的文件 ID
chunk_id integer -- 块 ID
);
-- 底层是 Faiss 的 IndexIVFFlat
-- IVF: Inverted File Index,将向量空间划分聚类
-- Flat: 每个聚类内的向量不做压缩,精确存储
向量索引的查询流程:
SQL 查询:SELECT * FROM article_vectors
WHERE embedding MATCH '[0.1, 0.2, ...]'
LIMIT 5;
↓ SQLite 查询引擎识别 vec0 虚拟表
↓ 将 SQL 条件转换为 Faiss API 调用
query_vector = [0.1, 0.2, ...]
k = 5 // 返回 5 个最近邻
nprobe = 10 // 搜索的聚类数量
↓ Faiss IndexIVFFlat 搜索
1. 将 query_vector 投影到向量空间
2. 找到最近的 nprobe 个聚类中心
3. 在这些聚类中精确搜索 k 个最近邻
4. 按距离排序返回结果
↓ 将 Faiss 结果转换为 SQLite 行格式
[chunk_id, file_id, distance]
为什么选择 IVF-PQ 而不是 HNSW?
在早期版本的 LibSQL(通过 sqlite-vss 扩展)中,主要使用的是 HNSW 算法。但在 2026 年的 LibSQL 生产版本中,Turso 选择了 IVF(Inverted File Index) 的变种作为默认索引。原因是:
- 内存效率:HNSW 需要将整个图保存在内存中,而 IVF 可以配合 PQ(Product Quantization)压缩向量,显著降低内存占用
- 增量插入友好:IVF 索引可以增量添加向量,而 HNSW 的增量插入代价较高
- Faiss 生态成熟:Faiss 的 IVF 实现在大规模部署中经过充分验证
3.3 WAL 同步机制:边缘副本的数据一致性
LibSQL 的嵌入式副本使用 WAL(Write-Ahead Log)作为增量同步的载体。WAL 是 SQLite 的一个关键特性——它将所有修改先写入一个单独的日志文件,然后异步应用到主数据库文件。这提升了 SQLite 的写入性能(因为避免了频繁的 fsync),同时也为增量同步提供了天然的载体。
WAL 同步的工作流程:
┌─────────────────┐ 1. 初始化同步 ┌─────────────────┐
│ 主库(Primary) │ ←─────────────── │ 边缘副本(Replica)│
│ │ 完整数据库文件 │ │
│ WAL: [TX1, TX2] │ ←─────────────── │ WAL: [TX1, TX2] │
│ Sequence: 100 │ │ Sequence: 100 │
└─────────────────┘ └──────────────────┘
│
│ 2. 写入新事务
↓
┌─────────────────┐
│ 主库写入 TX3 │
│ WAL: [TX1,TX2, │
│ TX3] │
│ Sequence: 101 │
└─────────────────┘
│
│ 3. 增量同步请求
│ GET /sync?since_seq=100
│
┌─────────────────┐
│ 边缘副本接收 │
│ TX3 WAL 条目 │
│ 写入本地 WAL │
│ 应用到主数据库 │
│ Sequence: 101 │
└─────────────────┘
序列号(Sequence Number) 是这个同步机制的核心。每个 WAL 条目都有一个单调递增的序列号,边缘副本记录自己最后已同步的序列号,下次同步时告诉主库"我需要序列号大于 X 的所有条目"。这使得同步是增量且幂等的——即使网络中断后重连,也只会同步缺失的部分,不会重复同步。
3.4 Turso 的边缘节点网络:Anycast 与物理部署
Turso 在全球部署了多个边缘节点,使用 Anycast 路由技术将用户请求路由到物理距离最近的节点:
用户请求 (libsql://app.turso.io)
↓
DNS Anycast 解析
↓
物理距离最近的边缘节点
(纽约/法兰克福/新加坡/东京)
↓
边缘节点处理读取(本地 SQLite)
或转发写入到主库(云端)
Anycast 的工作原理是:多个边缘节点宣告相同的 IP 前缀,网络路由会将数据包的发送目标指向拓扑距离最近(AS 跳数最少)的节点。这比传统的 DNS 地理路由更精确,因为它是基于网络拓扑而非地理位置来选择最优节点。
四、代码实战:从零构建一个 AI 原生知识库
这一节,我们用 LibSQL/Turso 从零构建一个 AI 原生知识库。这是一个典型的 RAG(检索增强生成)应用,数据存储在 Turso,向量搜索使用 LibSQL 的原生向量索引。
4.1 环境准备
# 安装 Turso CLI
curl -sSfL https://get.tur.so/install.sh | bash
# 登录 Turso
turso auth login
# 创建数据库
turso db create tech-knowledge-base
turso db show tech-knowledge-base
# 获取数据库 URL
turso db list
# 安装 SDK
pip install libsql-client # Python
npm install @libsql/client # Node.js
4.2 数据库初始化
import libsql_client
import asyncio
from libsql_client import create_client
async def init_database():
# 连接到 Turso 数据库
client = create_client(
url="libsql://tech-knowledge-base-qnnet.turso.io",
auth_token=os.getenv("TURSO_AUTH_TOKEN")
)
# 创建文章表
await client.execute("""
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
author TEXT,
tags TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
)
""")
# 创建向量表(用于 RAG 语义搜索)
# 注意:这是 LibSQL 的向量扩展功能
await client.execute("""
CREATE VECTOR TABLE IF NOT EXISTS article_embeddings (
article_id INTEGER,
chunk_id INTEGER,
chunk_text TEXT,
embedding FLOAT[1536],
token_count INTEGER,
UNIQUE(article_id, chunk_id)
)
""")
# 创建向量索引
try:
await client.execute("""
CREATE VECTOR INDEX article_embedding_idx
ON article_embeddings USING hnsw(embedding)
WITH distance_metric=cosine
""")
except Exception as e:
print(f"索引可能已存在: {e}")
await client.close()
print("数据库初始化完成!")
asyncio.run(init_database())
4.3 文档分块与嵌入生成
import openai
import tiktoken
import re
# 初始化 tokenizer(使用 cl100k_base,适用于 GPT-4 模型)
tokenizer = tiktoken.get_encoding("cl100k_base")
def chunk_text(text: str, max_tokens: int = 512, overlap: int = 64) -> list[dict]:
"""
将长文本分块为可管理的片段。
使用滑动窗口策略,保留相邻块之间的上下文重叠。
Args:
text: 要分块的原始文本
max_tokens: 每个块的最大 token 数量
overlap: 相邻块之间的重叠 token 数量
Returns:
分块列表,每个块包含文本和元数据
"""
# 按段落分割(保留段落边界)
paragraphs = re.split(r'\n\n+', text)
chunks = []
current_chunk = []
current_tokens = 0
for paragraph in paragraphs:
para_tokens = len(tokenizer.encode(paragraph))
# 如果单个段落就超过最大 token 数,按句子再切
if para_tokens > max_tokens:
sentences = re.split(r'(?<=[.!?。!?])\s+', paragraph)
for sentence in sentences:
sent_tokens = len(tokenizer.encode(sentence))
if current_tokens + sent_tokens > max_tokens:
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append({
'text': chunk_text,
'token_count': current_tokens
})
# 保留重叠内容
overlap_text = current_chunk[-1] if current_chunk else ''
current_chunk = [overlap_text, sentence]
current_tokens = len(tokenizer.encode('\n\n'.join(current_chunk)))
else:
current_chunk.append(sentence)
current_tokens += sent_tokens
else:
if current_tokens + para_tokens > max_tokens:
# 提交当前块,开始新块
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append({
'text': chunk_text,
'token_count': current_tokens
})
# 保留重叠
overlap_text = current_chunk[-1] if current_chunk else ''
current_chunk = [overlap_text, paragraph]
current_tokens = len(tokenizer.encode('\n\n'.join(current_chunk)))
else:
current_chunk.append(paragraph)
current_tokens += para_tokens
# 提交最后一个块
if current_chunk:
chunks.append({
'text': '\n\n'.join(current_chunk),
'token_count': current_tokens
})
return chunks
def generate_embeddings(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
"""
使用 OpenAI API 批量生成文本嵌入向量。
text-embedding-3-small 是 OpenAI 最新的小型嵌入模型,
维度 1536(可配置压缩到更小维度),性能接近 text-embedding-ada-002。
"""
response = openai.embeddings.create(
model=model,
input=texts
)
return [item.embedding for item in response.data]
# 测试分块
sample_text = """
LibSQL(又称 Turso)是 SQLite 的云原生扩展,由 SQLite 的创始人 D. Richard Hipp 所在的团队开发。
它通过一系列技术创新,在保持 SQLite 极致简洁的同时,突破了传统嵌入式数据库的能力边界。
LibSQL 的核心技术创新包括四个方面:
第一,Server Mode。通过 HTTP 和 WebSocket 提供远程数据库访问能力,使得前端应用和 Serverless 函数也能访问 SQLite 数据库。
第二,嵌入式副本。将数据库文件副本嵌入到边缘节点,实现零网络延迟的读取访问,并通过 WAL 增量同步保持与主库的一致性。
第三,向量搜索。通过集成 Faiss 库,在 SQLite 中实现了高效的向量相似度搜索,直接支持 RAG 和 AI 应用场景。
第四,多写者并发。通过 BEGIN CONCURRENT 事务和 MVCC 机制,打破了 SQLite 的单写锁限制,支持多个并发源同时写入。
这些技术创新使得 LibSQL 成为 2026 年边缘计算和 AI 原生应用的理想数据库选择。
"""
chunks = chunk_text(sample_text, max_tokens=200)
print(f"分块结果:{len(chunks)} 个块")
for i, chunk in enumerate(chunks):
print(f"块 {i+1} ({chunk['token_count']} tokens): {chunk['text'][:80]}...")
4.4 向量存储与 RAG 查询
async def store_article_with_embeddings(
client: libsql_client.Client,
article_id: int,
title: str,
content: str
):
"""
将文章内容和嵌入向量一起存储到数据库。
"""
# 1. 分块
chunks = chunk_text(content, max_tokens=512)
# 2. 提取所有块的文本
chunk_texts = [chunk['text'] for chunk in chunks]
# 3. 批量生成嵌入向量(OpenAI API 支持批量,最多 2048 条)
embeddings = generate_embeddings(chunk_texts)
# 4. 事务性插入(文章数据 + 向量数据)
async with client.transaction():
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
await client.execute(
"""
INSERT INTO article_embeddings
(article_id, chunk_id, chunk_text, embedding, token_count)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(article_id, chunk_id) DO UPDATE SET
chunk_text = excluded.chunk_text,
embedding = excluded.embedding,
token_count = excluded.token_count
""",
[article_id, i, chunk['text'], embedding, chunk['token_count']]
)
print(f"文章 {article_id} 已存储 {len(chunks)} 个嵌入向量块")
async def rag_query(
client: libsql_client.Client,
query: str,
top_k: int = 5,
rerank: bool = True
) -> list[dict]:
"""
RAG 查询流程:
1. 将用户查询转换为嵌入向量
2. 通过向量相似度搜索找到最相关的文档块
3. (可选)使用重排序模型进一步优化排序
4. 返回相关上下文供 LLM 生成答案
"""
# Step 1: 查询向量生成
query_embedding = generate_embeddings([query])[0]
# Step 2: 向量相似度搜索
result = await client.execute(
"""
SELECT
ae.chunk_text,
ae.article_id,
ae.chunk_id,
a.title,
a.author,
distance(ae.embedding, ?) as similarity
FROM article_embeddings ae
JOIN articles a ON ae.article_id = a.id
WHERE ae.embedding MATCH ?
ORDER BY similarity ASC
LIMIT ?
""",
[query_embedding, query_embedding, top_k]
)
# Step 3: 提取结果
contexts = []
for row in result.rows:
contexts.append({
'chunk_text': row[0],
'article_id': row[1],
'chunk_id': row[2],
'article_title': row[3],
'author': row[4],
'similarity': row[5]
})
# Step 4: 如果启用重排序,使用更精确的模型重新排序
if rerank and contexts:
reranked = await rerank_results(query, contexts)
return reranked
return contexts
async def rerank_results(query: str, contexts: list[dict]) -> list[dict]:
"""
使用交叉编码模型对初筛结果进行重排序。
交叉编码比向量搜索(双编码)更精确,但计算成本更高,
因此我们先用快速的向量搜索初筛,再用交叉编码重排。
"""
# 这里使用一个简化版本——按文本长度和相似度综合排序
# 生产环境中应该使用真正的重排序模型(如 Cohere Rerank)
for ctx in contexts:
# 惩罚过长或过短的块(可能包含太多无关内容或上下文不足)
text_len = len(ctx['chunk_text'])
length_score = 1.0 - abs(text_len - 500) / 1000 # 理想长度 500 字符
length_score = max(0, min(1, length_score))
# 综合得分:向量相似度(权重 0.8)+ 长度得分(权重 0.2)
ctx['final_score'] = 0.8 * (1 - ctx['similarity']) + 0.2 * length_score
return sorted(contexts, key=lambda x: x['final_score'], reverse=True)
# 完整 RAG 查询示例
async def answer_with_rag(client: libsql_client.Client, user_query: str):
"""完整的 RAG 查询流程"""
print(f"查询: {user_query}\n")
# 检索相关上下文
contexts = await rag_query(client, user_query, top_k=3)
if not contexts:
return "抱歉,知识库中没有找到与您问题相关的内容。"
# 构建上下文字符串
context_str = "\n\n---\n\n".join([
f"【{ctx['article_title']} - {ctx['author']}】\n{ctx['chunk_text']}"
for ctx in contexts
])
# 调用 LLM 生成答案(使用检索到的上下文)
prompt = f"""基于以下知识库内容回答用户的问题。
如果知识库中没有相关信息,请如实告知,不要编造。
---
知识库内容:
{context_str}
---
用户问题:{user_query}
请给出准确、详细的回答:
"""
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个技术专家助手,基于提供的知识库内容回答问题。"},
{"role": "user", "content": prompt}
],
temperature=0.3, # RAG 场景下使用较低的 temperature
max_tokens=1000
)
answer = response.choices[0].message.content
print(f"检索到 {len(contexts)} 个相关上下文块:")
for i, ctx in enumerate(contexts):
print(f" {i+1}. [{ctx['article_title']}] (相似度: {ctx['similarity']:.4f})")
print(f"\n答案:{answer}")
return answer
# 运行 RAG 查询
async def main():
client = create_client(
url="libsql://tech-knowledge-base-qnnet.turso.io",
auth_token=os.getenv("TURSO_AUTH_TOKEN")
)
answer = await answer_with_rag(
client,
"LibSQL 的多写者并发是如何工作的?"
)
await client.close()
asyncio.run(main())
4.5 使用嵌入式副本实现离线优先
import libsql_client
from libsql_client import create_client, Result
class OfflineFirstDB:
"""
离线优先数据库管理器。
在边缘设备(移动端、桌面应用)中,将 Turso 数据库的副本
本地化存储,应用优先读写本地副本,后台增量同步云端主库。
"""
def __init__(self, remote_url: str, auth_token: str, local_path: str):
self.remote_url = remote_url
self.auth_token = auth_token
self.local_path = local_path
self.remote_client = None
self.local_client = None
self.last_sync_seq = 0
async def initialize(self):
"""初始化数据库:如果是首次运行,从云端下载完整副本"""
import os
import aiohttp
import asyncio
if not os.path.exists(self.local_path):
print("首次运行,正在从云端下载完整数据库...")
# 方法一:使用 Turso CLI 下载(推荐)
# $ turso db shell tech-knowledge-base --local ./local.db
# $ turso db replicate tech-knowledge-base ./local.db
# 方法二:使用 HTTP API 下载 dump
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {self.auth_token}"}
async with session.get(
f"{self.remote_url}/v1/dump",
headers=headers
) as resp:
if resp.status == 200:
data = await resp.read()
with open(self.local_path, 'wb') as f:
f.write(data)
print(f"数据库已下载 ({len(data)} bytes)")
else:
raise Exception(f"下载失败: {resp.status}")
# 初始化本地数据库连接
self.local_client = create_client(f"file:{self.local_path}")
# 获取远程数据库的 WAL 序列号
self.remote_client = create_client(
url=self.remote_url,
auth_token=self.auth_token
)
# 读取本地最后同步的序列号
try:
result = await self.local_client.execute(
"PRAGMA libsql_remote_peer"
)
for row in result.rows:
if row[0] == 'last_sync_sequence':
self.last_sync_seq = int(row[1])
break
except:
self.last_sync_seq = 0
async def sync(self):
"""增量同步:将云端的新变更同步到本地"""
import aiohttp
if not self.remote_client:
self.remote_client = create_client(
url=self.remote_url,
auth_token=self.auth_token
)
# 请求增量 WAL(从 last_sync_seq 之后的变更)
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": f"Bearer {self.auth_token}",
"Content-Type": "application/json"
}
payload = {"since": self.last_sync_seq}
async with session.post(
f"{self.remote_url}/v1/sync",
headers=headers,
json=payload
) as resp:
if resp.status == 200:
wal_data = await resp.read()
if wal_data:
# 应用 WAL 到本地数据库
await self.local_client.execute(wal_data.decode('utf-8'))
self.last_sync_seq += len(wal_data)
print(f"已同步,序列号更新至 {self.last_sync_seq}")
elif resp.status == 204:
print("无需同步,云端无新变更")
else:
print(f"同步失败: {resp.status}")
async def execute_local(self, sql: str, args: list = None) -> Result:
"""执行本地查询(零延迟读取)"""
return await self.local_client.execute(sql, args or [])
async def execute_remote(self, sql: str, args: list = None) -> Result:
"""执行远程写入(所有写入都直接写云端)"""
return await self.remote_client.execute(sql, args or [])
async def close(self):
"""关闭所有连接"""
if self.local_client:
await self.local_client.close()
if self.remote_client:
await self.remote_client.close()
# 使用示例
async def main():
db = OfflineFirstDB(
remote_url="libsql://tech-knowledge-base-qnnet.turso.io",
auth_token=os.getenv("TURSO_AUTH_TOKEN"),
local_path="./data/knowledge-base.db"
)
await db.initialize()
# 每次应用启动时增量同步
await db.sync()
# 应用读取(本地,零延迟)
result = await db.execute_local(
"SELECT * FROM articles WHERE id = ?",
[1]
)
print(f"本地读取: {len(result.rows)} 行")
# 应用写入(远程云端)
await db.execute_remote(
"INSERT INTO articles (title, content) VALUES (?, ?)",
["新文章", "这是新文章的内容"]
)
# 写入后立即同步,确保本地副本也包含新数据
await db.sync()
await db.close()
asyncio.run(main())
五、性能优化:LibSQL/Turso 的生产级调优
5.1 向量索引调优:从 HNSW 到 IVF 的参数战争
向量搜索的性能和精度取决于索引参数的选择。这是一个需要根据实际数据特性来调整的过程。
import sqlite3
import time
def benchmark_vector_search(
conn: sqlite3.Connection,
query_vector: list[float],
k: int = 10,
nprobe_values: list[int] = [1, 5, 10, 20, 50, 100]
):
"""
基准测试:测试不同 nprobe 值对搜索精度和速度的影响。
nprobe 控制搜索时检查的聚类数量:
- nprobe 越大,精度越高,但速度越慢
- nprobe 越小,速度越快,但可能遗漏最近邻
"""
results = []
for nprobe in nprobe_values:
# 设置 IVF 搜索参数
conn.execute(f"PRAGMA vec0.nprobe = {nprobe}")
start = time.perf_counter()
cursor = conn.execute("""
SELECT chunk_text, distance(embedding, ?) as dist
FROM article_embeddings
WHERE embedding MATCH ?
ORDER BY dist ASC
LIMIT ?
""", [query_vector, query_vector, k])
elapsed = (time.perf_counter() - start) * 1000 # 毫秒
rows = cursor.fetchall()
results.append({
'nprobe': nprobe,
'elapsed_ms': elapsed,
'results_count': len(rows),
'avg_distance': sum(r[1] for r in rows) / len(rows) if rows else 0
})
print(f"nprobe={nprobe:3d}: {elapsed:8.2f}ms, "
f"{len(rows)} 结果, 平均距离={results[-1]['avg_distance']:.4f}")
return results
def optimize_vector_index(
conn: sqlite3.Connection,
training_vectors: list[list[float]],
max_vectors: int = 10000
):
"""
优化向量索引的构建参数。
关键参数:
- nlist: 聚类数量,通常设为向量总数的平方根
- nprobe: 搜索时检查的聚类数,通常设为 nlist 的 1-10%
"""
n_vectors = min(len(training_vectors), max_vectors)
# nlist 的经验公式:sqrt(n_vectors) * 2~4 倍
nlist = int(n_vectors ** 0.5 * 2)
nlist = max(50, min(nlist, 1000)) # 限制在合理范围内
print(f"优化参数:nlist={nlist} (基于 {n_vectors} 个向量)")
# 设置构建参数
conn.execute(f"PRAGMA vec0.nlist = {nlist}")
# 构建索引(这可能需要一些时间)
start = time.perf_counter()
conn.execute("""
CREATE VECTOR INDEX article_embedding_idx
ON article_embeddings(embedding)
USING ivf(embedding)
WITH nlist = ?
""", [nlist])
elapsed = time.perf_counter() - start
print(f"索引构建完成,耗时 {elapsed:.2f} 秒")
5.2 边缘读取的性能调优
边缘节点的性能调优主要是围绕 减少 I/O 开销展开的:
# SQLite 性能调优参数
PERFORMANCE_TUNING_SQL = """
-- 1. WAL 模式(支持并发读写,性能比 journal 模式更好)
PRAGMA journal_mode = WAL;
-- 2. 同步模式(提高写入性能,在边缘副本场景中可接受更低的一致性)
PRAGMA synchronous = NORMAL; -- NORMAL 比 FULL 快 5-10 倍
-- 3. 缓存大小(增加到 64MB,减少磁盘 I/O)
PRAGMA cache_size = -65536; -- 负数表示 KB,65536KB = 64MB
-- 4. 内存映射大小(允许 SQLite 将数据库映射到内存)
PRAGMA mmap_size = 268435456; -- 256MB
-- 5. 页面大小(标准 4KB 适合大多数场景)
PRAGMA page_size = 4096;
-- 6. 启用查询缓存(重复查询直接返回缓存结果)
PRAGMA query_only = OFF; -- 边缘副本模式下可设为 ON(只读)
-- 7. 预热缓存(启动时加载热点数据到缓存)
PRAGMA soft_heap_limit = 67108864; -- 64MB 软堆限制
"""
async def tune_performance(client: libsql_client.Client):
"""应用性能调优参数"""
for pragma in PERFORMANCE_TUNING_SQL.strip().split(';'):
pragma = pragma.strip()
if pragma.startswith('PRAGMA'):
try:
await client.execute(pragma)
except Exception as e:
print(f"跳过 {pragma[:30]}...: {e}")
print("性能调优完成")
5.3 写入性能优化:批量写入与事务合并
在 Turso 的 Serverless 场景中,频繁的小写入会产生大量的 HTTP 请求开销。通过批量写入可以显著提升性能:
async def batch_insert_embeddings(
client: libsql_client.Client,
articles: list[dict], # [{"id": 1, "content": "..."}, ...]
batch_size: int = 50
):
"""
批量插入嵌入向量。
性能优化点:
1. 使用事务包装批量操作(减少 fsync 开销)
2. 批量生成嵌入向量(利用 API 的批处理能力)
3. 分批执行 SQL(避免单个事务过大)
"""
total_chunks = 0
for batch_start in range(0, len(articles), batch_size):
batch = articles[batch_start:batch_start + batch_size]
# Step 1: 批量分块
all_chunks = []
for article in batch:
chunks = chunk_text(article['content'], max_tokens=512)
for i, chunk in enumerate(chunks):
all_chunks.append({
'article_id': article['id'],
'chunk_id': i,
'text': chunk['text']
})
# Step 2: 批量生成嵌入向量(每批最多 2048 条)
chunk_texts = [c['text'] for c in all_chunks]
embeddings = generate_embeddings(chunk_texts)
# Step 3: 事务性批量插入
async with client.transaction():
for chunk, embedding in zip(all_chunks, embeddings):
await client.execute(
"""
INSERT INTO article_embeddings
(article_id, chunk_id, chunk_text, embedding)
VALUES (?, ?, ?, ?)
""",
[chunk['article_id'], chunk['chunk_id'],
chunk['text'], embedding]
)
total_chunks += len(all_chunks)
print(f"已处理 {batch_start + len(batch)}/{len(articles)} 篇文章,"
f"{total_chunks} 个嵌入块")
return total_chunks
六、与主流技术栈的对比
理解 LibSQL/Turso 的定位,最好的方式是与现有的主流数据库方案做对比:
| 特性 | SQLite(原生) | LibSQL/Turso | PostgreSQL + pgvector | Turso(对比 Neon/PlanetScale) |
|---|---|---|---|---|
| 部署模式 | 嵌入式 | 嵌入式 + 云托管 | 独立服务 | 边缘节点 + 云端主库 |
| 向量搜索 | ❌ 需要扩展 | ✅ 原生支持 | ✅ 扩展支持 | ✅ 原生支持 |
| 并发写入 | ❌ 单写锁 | ✅ MVCC 多写者 | ✅ MVCC | ✅ 多写者(通过 LibSQL) |
| 远程访问 | ❌ 不支持 | ✅ HTTP/WebSocket | ✅ 标准协议 | ✅ HTTP API |
| 边缘副本 | ❌ | ✅ 嵌入式副本 | ❌ | ✅ 核心特性 |
| 零冷启动 | ✅ | ✅ 文件即数据库 | ❌ 服务启动慢 | ✅ |
| 免费套餐存储 | 无限制(本地) | 9GB | 自托管(无限制) | 9GB |
| ALTER TABLE | 有限支持 | ✅ 增强支持 | ✅ 完全支持 | ✅ 增强支持 |
| 多语言 SDK | 多 | 多 | 多 | 多 |
LibSQL 的最佳场景:
- 移动应用:App 内嵌 SQLite 文件,本地优先,边缘副本实现离线访问
- 边缘计算:CDN 边缘节点部署嵌入式副本,零延迟读取
- Serverless 函数:冷启动即启动的轻量数据库,无需预置计算实例
- RAG 知识库:向量搜索 + SQL 的组合,单数据库满足 AI 应用的数据层需求
- 桌面应用:单文件数据库,随应用分发,零运维
LibSQL 的不适用场景:
- 超大规模数据:TB 级别的数据不适合嵌入式数据库
- 强一致性要求:嵌入式副本是最终一致性的,不适合金融级事务需求
- 复杂分析查询:需要 OLAP 引擎的场景(这类需求还是应该用 DuckDB 或 ClickHouse)
- 跨数据中心复制:目前 LibSQL 的多写者是单主库模式,多主库场景需要额外方案
七、2026 年趋势展望
7.1 AI 原生数据库的演进方向
LibSQL 的向量搜索能力只是开始。2026 年,我们预计会看到更多"AI 原生"的数据库特性:
多模态向量搜索:除了文本嵌入,数据库还需要支持图像、音频、视频的嵌入向量检索。LibSQL 的向量索引框架是通用的,未来支持多模态嵌入是可期的。
混合搜索(Hybrid Search):结合向量相似度搜索和 BM25 关键词搜索的混合搜索,正在成为 RAG 系统的标准配置。LibSQL 需要在 SQL 层面提供更方便的混合搜索语法。
流式推理(Streaming Inference):在数据库层面直接集成 LLM 推理能力,实现"存储即上下文"的范式——查询时自动将相关数据注入到 LLM 的上下文中,而不需要在应用层做复杂的数据序列化和传递。
7.2 边缘计算与数据库的融合
边缘计算的发展方向是计算密度越来越高。Cloudflare Workers 已经可以在边缘节点运行完整的 JavaScript 运行时,下一步是在边缘节点部署完整的数据库实例。
Turso 的嵌入式副本是这一步的先驱。但目前的实现还有局限性——边缘副本只能读,写入必须回到主库。未来的演进方向是边缘写入缓冲——在边缘节点缓冲写入操作,在网络恢复时批量同步到主库,同时通过 CRDT(Conflict-free Replicated Data Types)解决冲突。
7.3 数据库可观测性的新维度
随着 AI 应用对数据质量的依赖加深,**数据可观测性(Data Observability)**正在成为数据库的重要特性:
- 数据新鲜度监控:边缘副本与主库的数据延迟是多少?是否有过期的风险?
- 向量搜索质量监控:top-k 搜索结果的相似度分布是否健康?是否需要重建索引?
- 查询成本分析:哪些查询消耗了最多的计算资源?是否有可优化的慢查询?
这些需求正在推动数据库从"被动存储"向"主动感知"的转变。
八、总结
LibSQL/Turso 代表了 2026 年数据库领域的一个重要方向:将传统"重量级"数据库的能力,以"轻量级"的方式交付给开发者。
它不是在重新发明轮子,而是在 SQLite 这个人类历史上最成功的软件库的基础上,用工程创新解锁了四个关键能力:
- 向量搜索:让 SQLite 从纯结构化数据存储,进化为 AI 原生数据平台
- 嵌入式副本:将"计算去中心化"的革命延伸到数据层,实现真正的零延迟边缘数据访问
- 多写者并发:打破了 SQLite 的单写锁魔咒,使嵌入式数据库也能服务高并发写入场景
- Serverless 化:零冷启动、零运维、按实际使用量计费的云数据库体验
对于程序员而言,LibSQL/Turso 的价值在于:它让你可以在不引入运维复杂度和额外基础设施成本的情况下,获得向量搜索、边缘部署和多写者并发这些现代应用所需的核心数据能力。
在这个 AI 应用爆发、边缘计算崛起、微服务退潮的时代,LibSQL/Turso 代表的"简约但不简单"的工程哲学,正在找到自己的时代定位。
本文覆盖版本:LibSQL 1.x, Turso Platform 2026.5, SQLite 3.46