Rust 异步编程深度解析:从 async/await 原理到 Tokio 运行时架构——Future 状态机、Work-Stealing 调度、多线程执行器与生产级网络服务完整技术指南(2026)
作者:程序员茄子
标签:Rust|异步编程|Tokio|async/await|Future|状态机|Work-Stealing|高性能网络服务
关键词:Rust|async/await|Tokio|Future|Pin|Waker|Executor|Reactor|异步运行时|状态机|Work-Stealing|网络编程|高性能|并发
摘要
Rust 的异步编程模型以其零成本抽象、内存安全和极致性能著称,但也因其陡峭的学习曲线让无数开发者望而却步。async/await 语法背后隐藏着状态机变换、Pin 内存锁定、Waker 唤醒机制等复杂概念。本文从底层原理出发,系统剖析 Rust 异步编程的完整技术栈:从 Future 特征的数学本质,到 async fn 编译后的状态机结构;从 Pin 的设计哲学,到 Tokio 运行时的多线程调度架构;从 Work-Stealing 算法到生产级网络服务的性能优化实践。全文配备大量可运行代码示例,帮助你真正理解 Rust 异步编程的"为什么"和"怎么做",而不仅仅停留在"是什么"的层面。
目录
- 引言:Rust 异步编程的独特价值
- async/await 语法基础与编译原理
- Future 特征深度解析
- 状态机变换:async 函数的真相
- Pin 与 Unpin:自引用结构的内存安全
- Waker 与 Wake:任务唤醒的完整机制
- Executor 与 Reactor:执行器与反应器的分工
- Tokio 运行时架构深度剖析
- Work-Stealing 调度算法详解
- 多线程 vs 单线程运行时选型指南
- Task 生命周期与资源管理
- Stream 特征与异步迭代模式
- 错误处理与 Cancellation
- 背压控制与流量管理
- 性能优化实战技巧
- 实战:构建百万级连接的聊天服务器
- 与 Go Goroutine 的技术对比
- 与 Node.js Event Loop 的架构对比
- 常见陷阱与最佳实践
- 总结与异步生态展望
1. 引言:Rust 异步编程的独特价值
1.1 为什么需要异步编程?
在传统同步 I/O 模型中,每个连接需要一个线程(或进程)来处理。当并发连接数达到万级、十万级时,线程上下文切换和内存开销成为性能瓶颈。异步编程通过事件驱动和非阻塞 I/O,让少量线程就能处理海量并发连接。
同步模型:1 连接 : 1 线程 → C10K 问题
异步模型:N 连接 : M 线程 (M << N) → C10M 可能
1.2 Rust 异步模型的核心优势
| 特性 | Rust | Go | Node.js |
|---|---|---|---|
| 内存安全 | 编译期保证 | GC 保证 | GC 保证 |
| 零成本抽象 | ✅ | ❌(goroutine 有栈) | ❌(V8 有开销) |
| 无运行时 | ✅ | ❌ | ❌ |
| 可预测延迟 | ✅(无 GC 暂停) | ❌ | ❌ |
| 真正并行 | ✅(多线程) | ✅ | ❌(单线程事件循环) |
Rust 的异步代码编译为状态机,无堆分配、无虚拟函数调用,性能可媲美手写 C 代码。
1.3 本文的代码环境
所有代码示例基于:
- Rust 1.75+(支持
AsyncFn特征) - Tokio 1.38+(最新稳定版)
- edition 2021
# 创建项目
cargo new rust_async_deep_dive && cd rust_async_deep_dive
cargo add tokio --features full
cargo add reqwest
cargo add futures-util
2. async/await 语法基础与编译原理
2.1 最简单的异步函数
use tokio::net::TcpStream;
use tokio::time::{sleep, Duration};
// 这是一个异步函数
async fn fetch_data(host: &str, port: u16) -> std::io::Result<Vec<u8>> {
println!("[{}] 开始连接...", std::thread::current().name().unwrap_or("?"));
// .await 点:异步阻塞点
let stream = TcpStream::connect((host, port)).await?;
println!("[{}] 连接成功", std::thread::current().name().unwrap_or("?"));
sleep(Duration::from_millis(100)).await;
// 模拟读取数据
Ok(b"hello from server".to_vec())
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let data = fetch_data("127.0.0.1", 8080).await?;
println!("收到数据: {:?}", String::from_utf8_lossy(&data));
Ok(())
}
2.2 async 块的两种形式
#[tokio::main]
async fn main() {
// 形式1:async 块(返回一个 Future)
let future = async {
println!("在 async 块中");
42
};
// 此时 future 还未执行!
println!("future 已创建,但未执行");
// .await 才驱动执行
let result = future.await;
println!("结果: {}", result);
// 形式2:立即执行的 async 块
let result2 = async { 100 }.await;
println!("立即执行的结果: {}", result2);
}
关键理解:async fn 和 async {} 都只是返回一个 Future 值,不会立即执行任何代码。只有 .await 才会驱动 Future 前进。
2.3 .await 的精确语义
async fn demo_await_semantics() {
println!("A: 这个函数体的开始");
// 调用 async 函数,返回 Future,但不执行
let fut = async_function();
println!("B: Future 已创建");
// 第一个 .await:开始执行,遇到第一个 .await 点挂起
println!("C: 即将第一次 .await");
fut.await;
println!("E: Future 完成");
}
async fn async_function() {
println!(" 1: async 函数开始");
some_async_op().await; // 挂起点
println!(" 3: 从挂起点恢复");
}
async fn some_async_op() {
println!(" 2: 异步操作执行中(模拟 .await)");
}
输出顺序:A → B → C → 1 → 2 → 3 → E
这证明了:async fn 的体在 .await 时才会执行;遇到内部的 .await 时,控制权返回给调用者。
3. Future 特征深度解析
3.1 Future 特征的定义
use std::task::{Context, Poll};
// Rust 标准库中 Future 的定义
pub trait Future {
type Output; // 关联类型:Future 完成后的输出类型
// 核心方法:轮询 Future 的状态
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
poll 方法的语义:
- 返回
Poll::Ready(output):Future 已完成,输出结果 - 返回
Poll::Pending:Future 还未完成,需要等待
3.2 手动实现一个 Future
use std::future::Future;
use std::task::{Context, Poll};
use std::pin::Pin;
use std::time::{Duration, Instant};
// 一个在指定延迟后完成的 Future
struct Delay {
when: Instant,
}
impl Delay {
fn new(dur: Duration) -> Self {
Delay { when: Instant::now() + dur }
}
}
impl Future for Delay {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if Instant::now() >= self.when {
println!(" [Delay] 时间到,完成!");
Poll::Ready(())
} else {
println!(" [Delay] 时间未到,注册 waker 后返回 Pending");
// 关键:获取 waker 并安排唤醒
let waker = cx.waker().clone();
let when = self.when;
// 在真实实现中,这里会向计时器注册回调
// 为简化,这里用 std::thread 模拟
std::thread::spawn(move || {
let now = Instant::now();
if now < when {
std::thread::sleep(when - now);
}
waker.wake(); // 时间到,唤醒 Future
});
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
println!("开始 Delay Future...");
Delay::new(Duration::from_millis(500)).await;
println!("Delay Future 完成!");
}
3.3 Poll 的组合与链式调用
use futures_util::future::{select, join, Either};
use tokio::time::{sleep, Duration};
async fn task_a() -> &'static str {
sleep(Duration::from_millis(100)).await;
"task_a 完成"
}
async fn task_b() -> &'static str {
sleep(Duration::from_millis(200)).await;
"task_b 完成"
}
#[tokio::main]
async fn main() {
// join:等待所有 Future 完成
let (a, b) = join!(task_a(), task_b());
println!("join 结果: {} + {}", a, b);
// select:等待第一个完成的 Future
match select(task_a(), task_b()).await {
Either::Left((a, _)) => println!("select 结果: {} 先完成", a),
Either::Right((b, _)) => println!("select 结果: {} 先完成", b),
}
}
join! 和 select! 宏通过组合多个 Future 的 poll 调用,实现并发和竞赛语义。
4. 状态机变换:async 函数的真相
4.1 async fn 编译后的状态机
Rust 编译器将 async fn 转换为一个实现了 Future 的匿名结构体。每次 .await 点对应状态机的一个状态。
// 原始代码
async fn simple_async() -> i32 {
let x = 1;
some_op().await; // 状态点1
let y = 2;
another_op().await; // 状态点2
x + y
}
// 编译器生成的大致结构(伪代码)
struct SimpleAsync {
state: u32,
x: Option<i32>,
y: Option<i32>,
__await_1: Option<SomeOpFuture>,
__await_2: Option<AnotherOpFuture>,
}
impl Future for SimpleAsync {
type Output = i32;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
loop {
match self.state {
0 => {
self.x = Some(1);
self.__await_1 = Some(some_op());
self.state = 1;
}
1 => {
// 轮询第一个 await 的 Future
let result = self.__await_1.as_mut().unwrap().poll(cx)?;
self.__await_1 = None;
self.y = Some(2);
self.__await_2 = Some(another_op());
self.state = 2;
}
2 => {
let result = self.__await_2.as_mut().unwrap().poll(cx)?;
self.__await_2 = None;
return Poll::Ready(self.x.unwrap() + self.y.unwrap());
}
_ => unreachable!(),
}
}
}
}
4.2 状态机的内存布局
use std::mem::size_of;
// 对比:枚举 vs 状态机
enum StateEnum {
Init,
WaitingOp1(Op1Future),
WaitingOp2(Op1Future, Op2Future), // 所有字段同时存活
Done,
}
// 状态机版本(编译器生成)
struct StateMachine {
state: u8,
// 使用 Option 或 union 技巧,同一时刻只有一个 Future 字段有效
storage: Storage<Op1Future, Op2Future>,
}
// 状态机更紧凑!
assert!(size_of::<StateMachine>() < size_of::<StateEnum>());
关键优化:状态机只分配当前状态需要的字段,而不是所有状态的最大并集。
4.3 实际查看生成的状态机
# 使用 cargo-expand 查看宏展开
cargo install cargo-expand
# 查看 async fn 展开后的代码
cargo expand --lib --ugly
5. Pin 与 Unpin:自引用结构的内存安全
5.1 问题:自引用结构
struct SelfRef {
data: String,
pointer: *const String, // 指向 data 的指针
}
impl SelfRef {
fn new(s: &str) -> Self {
let mut sr = SelfRef {
data: s.to_string(),
pointer: std::ptr::null(),
};
// 让 pointer 指向 data
sr.pointer = &sr.data;
sr
}
fn get(&self) -> &str {
unsafe { &*self.pointer }
}
}
fn main() {
let sr = SelfRef::new("hello");
println!("{}", sr.get()); // OK
// 问题:如果 sr 被移动,pointer 变成悬垂指针!
let sr2 = sr; // 移动
// println!("{}", sr2.get()); // UB!pointer 指向旧位置
}
5.2 为什么 async 块需要 Pin
async 块中的局部变量在 .await 点后仍然需要访问,这形成了自引用:
async fn self_ref_example() {
let local_data = String::from("重要数据");
// 这里:local_data 在栈上
some_async_op().await; // 挂起,Future 可能被移动到堆上
// 恢复后:需要访问 local_data
// 如果 Future 被移动,local_data 的地址变了!
println!("数据: {}", local_data);
}
编译器生成的状态机包含指向自身的指针,因此需要 Pin 来保证状态机不会被移动。
5.3 Pin 的使用模式
use std::pin::Pin;
use std::marker::PhantomPinned;
struct AsyncState {
data: String,
// 标记:此类型可能需要固定
_pin: PhantomPinned,
}
impl AsyncState {
fn new(s: &str) -> Self {
AsyncState {
data: s.to_string(),
_pin: PhantomPinned,
}
}
// 只能安全地通过 Pin<&mut Self> 访问
fn process(self: Pin<&mut Self>) {
println!("处理: {}", self.data);
}
}
fn main() {
let mut state = AsyncState::new("test");
// 错误:不能安全移动
// let state2 = state; // 如果取消注释,编译错误
// 正确:通过 Pin 访问
let mut pinned = unsafe { Pin::new_unchecked(&mut state) };
pinned.as_mut().process();
}
5.4 Unpin 特征
use std::pin::Pin;
// 大多数类型实现 Unpin(可以安全移动)
// 只有少数类型(如自引用 Future)不实现 Unpin
fn is_unpin<T: Unpin>(_: &T) {}
fn main() {
let x = 42i32; // i32 实现 Unpin
is_unpin(&x); // OK
let s = String::from("hello"); // String 实现 Unpin
is_unpin(&s); // OK
// async 块的结果类型通常不实现 Unpin
let fut = async { 1 };
// is_unpin(&fut); // 编译错误:future 未实现 Unpin
}
6. Waker 与 Wake:任务唤醒的完整机制
6.1 Waker 的作用
当 Future 返回 Poll::Pending 时,它需要一种方式在准备好后通知执行器。这就是 Waker 的作用:
use std::task::{Waker, Context};
// Waker 的核心接口
impl Waker {
pub fn wake(self) { /* 唤醒任务 */ }
pub fn wake_by_ref(&self) { /* 通过引用唤醒 */ }
}
6.2 实现一个带唤醒的计时器 Future
use std::future::Future;
use std::task::{Context, Poll, Waker};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::thread;
// 共享状态:计时器
struct SharedState {
completed: bool,
waker: Option<Waker>,
}
// 计时器 Future
struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
impl TimerFuture {
fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// 启动后台线程
let state_clone = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut state = state_clone.lock().unwrap();
state.completed = true;
if let Some(waker) = state.waker.take() {
waker.wake(); // 时间到,唤醒 Future
}
});
TimerFuture { shared_state }
}
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut state = self.shared_state.lock().unwrap();
if state.completed {
Poll::Ready(())
} else {
// 保存 waker,以便后台线程唤醒我们
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
println!("[主] 开始等待 1 秒...");
let start = Instant::now();
TimerFuture::new(Duration::from_secs(1)).await;
println!("[主] 等待完成,耗时: {:?}", start.elapsed());
}
6.3 Waker 的克隆成本
// Waker 的克隆是 O(1) 的!
// 因为 Waker 使用 Arc-like 的引用计数
let waker1 = cx.waker().clone(); // 很快
let waker2 = cx.waker().clone(); // 很快
// 可以安全地跨线程传递
std::thread::spawn(move || {
waker1.wake(); // 在任意线程唤醒
});
7. Executor 与 Reactor:执行器与反应器的分工
7.1 架构概览
┌─────────────────────────────────────────┐
│ Application Code │
│ (async fn, .await) │
└──────────────────┬──────────────────────┘
│ 返回 Future
▼
┌─────────────────────────────────────────┐
│ Executor (执行器) │
│ - 调度 Task │
│ - 调用 Future::poll() │
│ - 管理 Task 队列 │
└──────────────────┬──────────────────────┘
│ 注册 I/O 事件
▼
┌─────────────────────────────────────────┐
│ Reactor (反应器) │
│ - epoll/kqueue/io_uring │
│ - 监听 I/O 事件 │
│ - 事件完成后调用 Waker.wake() │
└─────────────────────────────────────────┘
7.2 手写一个迷你 Executor
use std::future::Future;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::pin::Pin;
use std::collections::VecDeque;
// 超简单的 Executor
struct MiniExecutor {
ready_queue: VecDeque<Task>,
}
type Task = Pin<Box<dyn Future<Output = ()>>>;
impl MiniExecutor {
fn new() -> Self {
MiniExecutor {
ready_queue: VecDeque::new(),
}
}
fn spawn(&mut self, future: impl Future<Output = ()> + 'static) {
let task = Box::pin(future);
self.ready_queue.push_back(task);
}
fn run(&mut self) {
let waker = dummy_waker();
while !self.ready_queue.is_empty() {
let mut task = self.ready_queue.pop_front().unwrap();
let mut cx = Context::from_waker(&waker);
match task.as_mut().poll(&mut cx) {
Poll::Ready(()) => {
// Task 完成,丢弃
println!("[Executor] Task 完成");
}
Poll::Pending => {
// Task 未完成,放回队列尾部
self.ready_queue.push_back(task);
println!("[Executor] Task 挂起,重新排队");
}
}
}
}
}
// 创建一个空的 Waker(仅用于演示)
fn dummy_waker() -> Waker {
const VTABLE: RawWakerVTable = RawWakerVTable::new(
|_| RawWaker::new(std::ptr::null(), &VTABLE), // clone
|_| {}, // wake
|_| {}, // wake_by_ref
|_| {}, // drop
);
let raw_waker = RawWaker::new(std::ptr::null(), &VTABLE);
unsafe { Waker::from_raw(raw_waker) }
}
fn main() {
let mut executor = MiniExecutor::new();
executor.spawn(async {
println!("[Task1] 开始");
// 模拟异步操作
for i in 0..3 {
println!("[Task1] 步骤 {}", i);
yield_now().await;
}
println!("[Task1] 完成");
});
executor.spawn(async {
println!("[Task2] 开始");
yield_now().await;
println!("[Task2] 完成");
});
println!("=== 开始执行 ===");
executor.run();
}
// 让出一次执行机会
async fn yield_now() {
struct YieldNow { yielded: bool }
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
YieldNow { yielded: false }.await
}
8. Tokio 运行时架构深度剖析
8.1 Tokio 的组件
Tokio 运行时
├── Driver(驱动层)
│ ├── Io(网络 I/O:epoll/kqueue/IOCP)
│ ├── Time(计时器)
│ └── Signal(Unix 信号)
├── Scheduler(调度器)
│ ├── Work-Stealing 队列
│ ├── Local 队列(每个 worker 一个)
│ └── Inject 队列(全局)
└── Resource(资源管理)
├── TcpListener/TcpStream
├── UdpSocket
└── File/Process
8.2 配置 Tokio 运行时
use tokio::runtime::Runtime;
fn main() {
// 方法1:Builder 模式(推荐)
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4) // 4 个 worker 线程
.enable_all() // 启用所有驱动(Io/Time/Signal)
.thread_name("my-worker") // 线程名
.thread_stack_size(2 * 1024 * 1024) // 2MB 栈
.build()
.unwrap();
// 阻塞运行
rt.block_on(async {
println!("在 Tokio 运行时中");
});
// 方法2:当前线程运行时(单线程)
let rt_single = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt_single.block_on(async {
println!("在单线程运行时中");
});
}
8.3 #[tokio::main] 宏的展开
// 你写的代码
#[tokio::main]
async fn main() {
println!("hello");
}
// 宏展开后的代码(大致)
fn main() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
println!("hello");
});
}
9. Work-Stealing 调度算法详解
9.1 为什么需要 Work-Stealing?
问题:多个 worker 线程,如何分配任务?
方案1:全局队列 + 锁 → 竞争大,扩展性差
方案2:每个 worker 一个本地队列 → 负载不均
方案3:Work-Stealing → 每个 worker 有本地队列,空闲时从其他 worker 偷任务
9.2 Tokio 的调度队列
Global Queue (Inject Queue)
├── 新任务入口
└── 所有 worker 都能取
Worker 0 Worker 1 Worker 2
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Local │ │ Local │ │ Local │
│ Queue │ │ Queue │ │ Queue │
│ (deque) │ │ (deque) │ │ (deque) │
└─────────┘ └─────────┘ └─────────┘
│ │ │
└─────────────────┴─────────────────┘
空闲时从右侧偷任务(Work-Stealing)
9.3 调度优先级
use tokio::task;
#[tokio::main]
async fn main() {
// 普通任务:进入 global queue
tokio::spawn(async {
println!("普通任务");
});
// 标记任务:进入当前 worker 的 local queue(更快)
task::Builder::new()
.name("marked-task")
.spawn(async {
println!("标记任务");
})
.unwrap();
}
10. 多线程 vs 单线程运行时选型指南
10.1 对比表
| 特性 | multi_thread | current_thread |
|---|---|---|
| 线程数 | N(可配置) | 1 |
| 并行能力 | ✅ | ❌ |
| 适用于 | 服务器、Web 应用 | 客户端、嵌入式 |
| 开销 | 较高(线程间通信) | 很低 |
| Work-Stealing | ✅ | ❌ |
10.2 性能测试
use tokio::time::{sleep, Duration};
use std::time::Instant;
async fn fake_io(id: u64) {
sleep(Duration::from_millis(10)).await;
println!("任务 {} 完成", id);
}
fn benchmark_scheduler(worker_threads: usize) {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(worker_threads)
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let start = Instant::now();
let mut handles = vec![];
for i in 0..1000 {
handles.push(tokio::spawn(fake_io(i)));
}
for h in handles {
h.await.unwrap();
}
let elapsed = start.elapsed();
println!("[{} workers] 1000 任务耗时: {:?}", worker_threads, elapsed);
});
}
fn main() {
benchmark_scheduler(1);
benchmark_scheduler(2);
benchmark_scheduler(4);
benchmark_scheduler(8);
}
11. Task 生命周期与资源管理
11.1 Task 的创建与销毁
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
// 创建 Task
let handle: JoinHandle<i32> = tokio::spawn(async {
println!("[Task] 开始执行");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("[Task] 执行完成");
42
});
// 等待 Task 完成
match handle.await {
Ok(result) => println!("Task 返回: {}", result),
Err(e) => println!("Task 失败: {:?}", e),
}
}
11.2 Task 取消(Cancellation)
use tokio::time::{sleep, timeout, Duration};
use tokio::select;
#[tokio::main]
async fn main() {
// 方法1:timeout
let result = timeout(Duration::from_millis(50), async_operation()).await;
match result {
Ok(value) => println!("完成: {}", value),
Err(_) => println!("超时!"),
}
// 方法2:select(竞赛)
select! {
value = async_operation() => {
println!("操作完成: {}", value);
}
_ = sleep(Duration::from_millis(50)) => {
println!("取消操作(超时)");
}
}
}
async fn async_operation() -> i32 {
sleep(Duration::from_millis(100)).await;
42
}
11.3 Drop 保证
struct Resource {
id: u64,
}
impl Drop for Resource {
fn drop(&mut self) {
println!("[Resource {}] 被丢弃", self.id);
}
}
#[tokio::main]
async fn main() {
let res = Resource { id: 1 };
tokio::select! {
_ = async {
sleep(Duration::from_millis(100)).await;
println!("操作完成");
} => {}
_ = sleep(Duration::from_millis(50)) => {
println!("取消!");
// res 在这里被丢弃(即使 async 块未执行完)
}
}
println!("main 结束");
// Resource 1 会在适当的时候被丢弃
}
12. Stream 特征与异步迭代模式
12.1 Stream 特征的定义
use std::pin::Pin;
use std::task::{Context, Poll};
// Stream = 异步版本的 Iterator
trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
}
12.2 使用 Stream
use futures_util::stream::{self, StreamExt};
use tokio_stream::wrappers::IntervalStream;
use tokio::time::{interval, Duration};
#[tokio::main]
async fn main() {
// 创建 Stream
let mut stream = stream::iter(1..=5);
// 消费 Stream
while let Some(item) = stream.next().await {
println!("Item: {}", item);
}
// 使用操作符
let sum: i32 = stream::iter(1..=10)
.filter(|x| async { x % 2 == 0 })
.map(|x| x * x)
.collect::<Vec<_>>()
.await
.into_iter()
.sum();
println!("平方和: {}", sum);
// 计时器 Stream
let mut tick_stream = IntervalStream::new(interval(Duration::from_millis(500)));
for _ in 0..5 {
tick_stream.next().await;
println!("Tick!");
}
}
12.3 实现一个自定义 Stream
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::Stream;
// 生成斐波那契数列的 Stream
struct Fibonacci {
a: u64,
b: u64,
count: usize,
}
impl Fibonacci {
fn new(count: usize) -> Self {
Fibonacci { a: 0, b: 1, count }
}
}
impl Stream for Fibonacci {
type Item = u64;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
if self.count == 0 {
return Poll::Ready(None);
}
let next = self.a;
self.a = self.b;
self.b = next + self.b;
self.count -= 1;
Poll::Ready(Some(next))
}
}
#[tokio::main]
async fn main() {
let mut fib = Fibonacci::new(10);
while let Some(n) = fib.next().await {
println!("Fib: {}", n);
}
}
13. 错误处理与 Cancellation
13.1 异步代码中的错误传播
use tokio::fs;
use tokio::io;
#[tokio::main]
async fn main() -> io::Result<()> {
// ? 操作符在 async 中同样有效
let content = fs::read_to_string("test.txt").await?;
println!("内容: {}", content);
Ok(())
}
13.2 多个 Future 的错误处理
use tokio::try_join;
async fn op1() -> Result<i32, String> {
Ok(1)
}
async fn op2() -> Result<i32, String> {
Err("op2 失败".to_string())
}
#[tokio::main]
async fn main() {
// try_join:任何一个失败就立即返回错误
match try_join!(op1(), op2()) {
Ok((v1, v2)) => println!("成功: {} + {}", v1, v2),
Err(e) => println!("失败: {}", e),
}
}
13.3 Cancellation 的安全处理
use tokio::sync::Notify;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let handle = tokio::spawn(async move {
// 可取消的长期运行任务
for i in 0..100 {
// 定期检查取消信号
if notify_clone.notified().now_or_never().is_some() {
println!("任务 {} 收到取消信号,退出", i);
break;
}
println!("工作中... {}", i);
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
// 3 秒后取消
tokio::time::sleep(Duration::from_millis(300)).await;
notify.notify_one();
handle.await.unwrap();
}
14. 背压控制与流量管理
14.1 问题:生产者快于消费者
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
#[tokio::main]
async fn main() {
// 无界 channel:生产者可以无限快
let (tx, mut rx) = mpsc::unbounded_channel();
// 问题:如果生产者远快于消费者,内存会爆炸!
// 解决:使用有界 channel(背压)
let (tx, mut rx) = mpsc::channel(100); // 缓冲区大小 100
let producer = tokio::spawn(async move {
for i in 0..1000 {
// send() 会在缓冲区满时自动背压(异步等待)
tx.send(i).await.unwrap();
}
});
let consumer = tokio::spawn(async move {
while let Some(item) = rx.recv().await {
// 慢速消费
tokio::time::sleep(Duration::from_millis(10)).await;
println!("消费: {}", item);
}
});
producer.await.unwrap();
consumer.await.unwrap();
}
14.2 使用 Stream 的缓冲控制
use futures_util::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(0..1000)
.map(|i| async move {
// 模拟慢速 I/O
tokio::time::sleep(Duration::from_millis(10)).await;
i
})
.buffer_unordered(10); // 最多 10 个并发
stream.for_each(|i| async move {
println!("完成: {}", i);
}).await;
}
15. 性能优化实战技巧
15.1 避免不必要的 .await
// 不好:每次循环都 .await
for i in 0..1000 {
do_something(i).await; // 1000 次上下文切换
}
// 好:批量处理
let futures = (0..1000).map(do_something);
join_all(futures).await; // 并发执行
15.2 使用 Channel 代替共享内存
use tokio::sync::mpsc;
// 不好:频繁锁竞争
let shared_counter = Arc::new(Mutex::new(0));
for _ in 0..10 {
let counter = counter.clone();
tokio::spawn(async move {
let mut num = counter.lock().await;
*num += 1;
});
}
// 好:通过 channel 串行化
let (tx, mut rx) = mpsc::channel(100);
for _ in 0..10 {
let tx = tx.clone();
tokio::spawn(async move {
tx.send(1).await.unwrap();
});
}
// 单个消费者更新计数器
tokio::spawn(async move {
let mut counter = 0;
while let Some(_) = rx.recv().await {
counter += 1;
}
});
15.3 CPU 密集型任务的处理
use tokio::task;
#[tokio::main]
async fn main() {
// 不好:CPU 密集任务阻塞 worker 线程
let result = expensive_computation().await; // 阻塞!
// 好:扔到专用线程池
let result = task::spawn_blocking(|| {
expensive_computation_sync()
}).await.unwrap();
}
fn expensive_computation_sync() -> i32 {
// 模拟 CPU 密集计算
(0..1_000_000).fold(0, |acc, x| acc + x)
}
16. 实战:构建百万级连接的聊天服务器
16.1 架构设计
Client → TCP → ChatServer
├── Connection Handler (每个连接一个 Task)
├── Room Manager (共享状态)
└── Message Broadcaster (消息广播)
16.2 完整实现
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
// 聊天服务器
struct ChatServer {
rooms: Arc<Mutex<HashMap<String, Room>>>,
}
struct Room {
// broadcast: 向房间内所有人发消息
tx: broadcast::Sender<String>,
}
impl ChatServer {
fn new() -> Self {
ChatServer {
rooms: Arc::new(Mutex::new(HashMap::new())),
}
}
async fn run(&self, addr: &str) -> std::io::Result<()> {
let listener = TcpListener::bind(addr).await?;
println!("[Server] 监听 {}", addr);
loop {
let (socket, addr) = listener.accept().await?;
println!("[Server] 新连接: {}", addr);
let server = self.clone();
tokio::spawn(async move {
server.handle_connection(socket).await;
});
}
}
async fn handle_connection(&self, socket: TcpStream) {
// 简化:每个连接加入 "general" 房间
let room_name = "general".to_string();
// 获取或创建房间
let rx = {
let mut rooms = self.rooms.lock().unwrap();
let room = rooms.entry(room_name.clone()).or_insert_with(|| {
let (tx, _) = broadcast::channel(100);
Room { tx }
});
room.tx.subscribe()
};
let (reader, mut writer) = socket.into_split();
let mut reader = BufReader::new(reader);
// Task 1:从客户端读取消息,广播到房间
let tx_clone = self.rooms.lock().unwrap()
.get(&room_name).unwrap().tx.clone();
let read_task = tokio::spawn(async move {
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // 连接关闭
Ok(_) => {
let msg = line.trim().to_string();
if !msg.is_empty() {
tx_clone.send(msg).unwrap();
}
}
Err(e) => {
eprintln!("[Read] 错误: {}", e);
break;
}
}
}
});
// Task 2:从房间接收消息,发送给客户端
let write_task = tokio::spawn(async move {
let mut rx = rx;
while let Ok(msg) = rx.recv().await {
if writer.write_all(format!("{}\n", msg).as_bytes()).await.is_err() {
break;
}
}
});
// 等待任意一个 Task 结束
tokio::select! {
_ = read_task => {},
_ = write_task => {},
}
println!("[Server] 连接断开");
}
}
impl Clone for ChatServer {
fn clone(&self) -> Self {
ChatServer {
rooms: self.rooms.clone(),
}
}
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let server = ChatServer::new();
server.run("0.0.0.0:8080").await
}
17. 与 Go Goroutine 的技术对比
17.1 并发模型对比
| 维度 | Rust (Tokio) | Go (Goroutine) |
|---|---|---|
| 调度器 | Work-Stealing | 抢占式(runtime 管理) |
| 栈 | 无栈(状态机) | 有栈(2KB 起) |
| 上下文切换 | 函数调用 | 内核线程切换 |
| 内存占用 | ~0(状态机在栈上) | 2KB+ 每 goroutine |
| 取消 | 手动(select/timeout) | 手动(context) |
17.2 性能对比代码示例
// Rust 版本:10 万并发连接
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
// 处理连接(每个连接 ~0 额外内存)
handle_connection(socket).await;
});
}
}
// Go 版本:10 万并发连接
func main() {
listener, _ := net.Listen("tcp", "0.0.0.0:8080")
for {
conn, _ := listener.Accept()
go func() {
// 处理连接(每个连接 ~2KB 栈)
handleConnection(conn)
}()
}
}
结论:Rust 可以支持更多并发连接(C10M),Go 开发效率更高。
18. 与 Node.js Event Loop 的架构对比
18.1 事件循环对比
Node.js:
单线程 Event Loop → 所有 I/O 在这个线程处理
→ 优势:无锁编程
→ 劣势:无法利用多核
Rust/Tokio:
多线程 Event Loop → 每个 worker 一个 Event Loop
→ 优势:真正并行,利用多核
→ 劣势:需要怕并处理
18.2 CPU 密集任务的处理
// Node.js:CPU 密集任务阻塞 Event Loop
app.get('/compute', (req, res) => {
const result = heavyComputation(); // 阻塞!
res.json({ result });
});
// 解决:worker_threads
// Rust:扔到专用线程池,不阻塞 worker
#[tokio::main]
async fn main() {
let result = tokio::task::spawn_blocking(|| {
heavy_computation() // 在专用线程执行
}).await.unwrap();
println!("结果: {}", result);
}
19. 常见陷阱与最佳实践
19.1 陷阱1:在 async 中调用阻塞代码
// 错误示例!
#[tokio::main]
async fn main() {
// std::thread::sleep 阻塞当前 worker 线程!
std::thread::sleep(Duration::from_secs(1));
// 应该是:
tokio::time::sleep(Duration::from_secs(1)).await;
}
19.2 陷阱2:持有 Mutex 跨越 .await 点
use tokio::sync::Mutex;
// 错误:长时间持有锁跨越 .await
async fn bad_example(lock: Arc<Mutex<i32>>) {
let mut num = lock.lock().await;
// 危险:.await 期间持有锁
tokio::time::sleep(Duration::from_millis(100)).await;
*num += 1;
}
// 正确:缩小锁范围
async fn good_example(lock: Arc<Mutex<i32>>) {
{
let mut num = lock.lock().await;
*num += 1;
} // 锁在这里释放
tokio::time::sleep(Duration::from_millis(100)).await;
}
19.3 最佳实践清单
- 使用
tokio::time::sleep而不是std::thread::sleep - 使用
tokio::fs而不是std::fs - 使用
tokio::sync::Mutex而不是std::sync::Mutex - CPU 密集任务用
spawn_blocking - 避免在 async 代码中用
.unwrap()(用?或match) - 合理设置 Channel 缓冲区大小(背压控制)
- 使用
select!实现超时和取消 - 监控任务数量,防止 Task 泄漏
20. 总结与异步生态展望
20.1 核心要点回顾
- Rust 异步 = 零成本抽象 + 内存安全 + 极致性能
async fn返回 Future,只有.await才驱动执行- Future 是状态机,
Pin保证自引用结构的内存安全 - Waker 实现高效唤醒,避免忙等待
- Tokio 提供生产级运行时:Work-Stealing 调度、多线程执行
select!+timeout实现优雅的取消和超时
20.2 异步生态工具链
Runtime: Tokio (主流), async-std, smol
Web 框架: Axum, Actix-web, Warp
数据库: sqlx, tokio-postgres, redis-rs
gRPC: tonic
消息队列: lapin (AMQP), rdkafka
Stream: futures-util, tokio-stream
20.3 未来展望
- Async Iterator(正式化 Stream 特征)
- Async Drop(异步资源清理)
- Async Closure(更简洁的异步闭包)
- 更好的错误处理机制
参考资料
版权声明:本文为原创深度技术文章,转载请注明出处(程序员茄子 https://www.chenxutan.com)。