Rust
第二十四章:Tokio 深入 —— 异步运行时的内部世界
第二十四章:Tokio 深入 —— 异步运行时的内部世界
本章目标
- 理解 Tokio 的架构设计,对比 Node.js 的事件循环
- 掌握 Runtime 的配置与选择(单线程 vs 多线程)
- 理解任务调度的工作窃取(work-stealing)机制
- 学会使用异步 I/O(TcpStream、文件操作)
- 掌握 Tokio 的异步同步原语(Mutex、RwLock、Semaphore、Channel)
- 学会处理超时与任务取消
- 通过实战构建一个异步聊天服务器
预计学习时间:120 - 180 分钟(这章内容丰富,建议边读边动手实践)
24.1 Tokio 架构(对比 Node.js Event Loop)
24.1.1 Node.js 的事件循环回顾
作为 JavaScript 开发者,你对事件循环一定不陌生:
// Node.js 事件循环 - 单线程
const http = require('http');
const server = http.createServer(async (req, res) => {
// 所有请求都在同一个线程上处理
const data = await fetchFromDB(); // 异步 I/O,不阻塞事件循环
res.end(data);
});
server.listen(3000);
// Node.js 的事件循环:
// ┌───────────────────────────────────┐
// │ 单线程事件循环 │
// │ ┌─────┐ ┌─────┐ ┌─────┐ │
// │ │ 回调1 │ │ 回调2 │ │ 回调3 │ │
// │ └─────┘ └─────┘ └─────┘ │
// │ ↓ ↓ ↓ │
// │ ┌───────────────────────┐ │
// │ │ libuv 线程池 │ │
// │ │ (文件 I/O, DNS 等) │ │
// │ └───────────────────────┘ │
// └───────────────────────────────────┘
24.1.2 Tokio 的架构
Tokio 是一个多线程异步运行时,设计哲学与 Node.js 有很大不同:
┌────────────────────────────────────────────────────────────┐
│ Tokio 运行时架构 │
├────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Tokio Runtime(运行时) │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Worker 0 │ │ Worker 1 │ │ Worker 2 │ ... │ │
│ │ │ (线程 0) │ │ (线程 1) │ │ (线程 2) │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ ┌──────┐ │ │ ┌──────┐ │ │ ┌──────┐ │ │ │
│ │ │ │任务队列│ │ │ │任务队列│ │ │ │任务队列│ │ │ │
│ │ │ │Task A │ │ │ │Task C │ │ │ │Task E │ │ │ │
│ │ │ │Task B │ │ │ │Task D │ │ │ │ │ │ │ │
│ │ │ └──────┘ │ │ └──────┘ │ │ └──────┘ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ↕ 工作窃取 ↕ ↕ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ I/O Driver(epoll/kqueue) │ │ │
│ │ │ 监听所有异步 I/O 事件 │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Timer Driver(时间轮) │ │ │
│ │ │ 管理所有 sleep/timeout │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────┘
24.1.3 核心区别对比
┌────────────────────────────────────────────────────────┐
│ Node.js vs Tokio 核心对比 │
├──────────────────┬─────────────────────────────────────┤
│ │ Node.js │ Tokio │
├──────────────────┼───────────────────┼──────────────────┤
│ 线程模型 │ 单线程 + 线程池 │ 多线程工作窃取 │
│ CPU 利用 │ 单核(需 cluster) │ 自动多核 │
│ 调度单位 │ 回调/Promise │ Future/Task │
│ I/O 模型 │ libuv │ epoll/kqueue │
│ 阻塞操作 │ 不要在主线程做! │ spawn_blocking │
│ 内存开销/任务 │ ~数 KB(闭包) │ ~数百字节(状态机)│
│ 取消机制 │ AbortController │ drop JoinHandle │
│ 错误传播 │ try/catch, .catch │ Result + ? │
│ 生态系统 │ npm(巨大) │ crates.io(增长中)│
└──────────────────┴───────────────────┴──────────────────┘
24.1.4 Tokio 的核心组件
// Tokio 的三大核心组件
// 1. Runtime(运行时)—— 管理线程和调度
use tokio::runtime::Runtime;
// 2. Task(任务)—— 异步操作的最小单位
use tokio::task;
// 3. I/O Driver —— 处理异步 I/O
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
24.2 Runtime 配置
24.2.1 两种运行时模式
Tokio 提供两种运行时模式:
// 模式 1:多线程运行时(默认)
// 适合 CPU 密集 + I/O 混合的场景
#[tokio::main] // 这个宏展开后创建多线程运行时
async fn main() {
println!("运行在多线程运行时上!");
}
// 等价于手动创建:
fn main() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4) // 工作线程数,默认等于 CPU 核心数
.enable_all() // 启用所有功能(I/O + 时间)
.build()
.unwrap();
rt.block_on(async {
println!("运行在多线程运行时上!");
});
}
// 模式 2:当前线程运行时(单线程)
// 适合轻量级场景、测试、嵌入式
#[tokio::main(flavor = "current_thread")]
async fn main() {
println!("运行在单线程运行时上!");
}
// 等价于:
fn main() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
println!("运行在单线程运行时上!");
});
}
24.2.2 运行时配置详解
use tokio::runtime::Builder;
fn create_custom_runtime() -> tokio::runtime::Runtime {
Builder::new_multi_thread()
// 工作线程数
.worker_threads(8)
// 阻塞线程池的最大线程数
// 用于 spawn_blocking 任务
.max_blocking_threads(32)
// 线程名称前缀(调试用)
.thread_name("my-app-worker")
// 线程栈大小(默认 2MB)
.thread_stack_size(4 * 1024 * 1024) // 4MB
// 启用 I/O 驱动
.enable_io()
// 启用时间驱动
.enable_time()
.build()
.expect("无法创建运行时")
}
24.2.3 何时选择哪种运行时?
┌──────────────────────────────────────────────────────┐
│ 如何选择运行时模式? │
├──────────────────────────────────────────────────────┤
│ │
│ 多线程运行时(默认): │
│ ├─ Web 服务器 │
│ ├─ 微服务 │
│ ├─ 需要充分利用多核 CPU │
│ ├─ 任务之间相对独立 │
│ └─ 大多数生产场景 │
│ │
│ 单线程运行时: │
│ ├─ 单元测试 │
│ ├─ 简单的 CLI 工具 │
│ ├─ 嵌入式系统 │
│ ├─ 需要确定性调度 │
│ ├─ 与非 Send 类型配合 │
│ └─ 最小化开销 │
│ │
└──────────────────────────────────────────────────────┘
24.3 任务调度
24.3.1 tokio::spawn —— 创建异步任务
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// spawn 创建一个新的异步任务
// 任务会被调度到运行时的某个工作线程上
let handle = tokio::spawn(async {
sleep(Duration::from_secs(1)).await;
println!("任务完成!");
42 // 返回值
});
// 等待任务完成并获取返回值
let result = handle.await.unwrap();
println!("结果: {}", result);
// 并发运行多个任务
let mut handles = vec![];
for i in 0..5 {
let handle = tokio::spawn(async move {
sleep(Duration::from_millis(100 * i as u64)).await;
println!("任务 {} 完成", i);
i * 10
});
handles.push(handle);
}
// 等待所有任务
for handle in handles {
let result = handle.await.unwrap();
println!("结果: {}", result);
}
}
对比 Node.js 的 Promise.all:
// Node.js
async function main() {
const tasks = [];
for (let i = 0; i < 5; i++) {
tasks.push(new Promise(resolve => {
setTimeout(() => {
console.log(`任务 ${i} 完成`);
resolve(i * 10);
}, 100 * i);
}));
}
const results = await Promise.all(tasks);
console.log('结果:', results);
}
24.3.2 工作窃取调度
┌──────────────────────────────────────────────────────────┐
│ 工作窃取(Work Stealing)调度 │
├──────────────────────────────────────────────────────────┤
│ │
│ 线程 A(忙碌) 线程 B(空闲) │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Task 1 ████ │ │ │ │
│ │ Task 2 ██ │ ──→ │ Task 4 ██ │ ← 从线程 A 偷来 │
│ │ Task 3 █ │ │ │ │
│ │ Task 4 │ │ │ │
│ └──────────────┘ └──────────────┘ │
│ │
│ 工作窃取的好处: │
│ 1. 自动负载均衡 —— 空闲线程主动找活干 │
│ 2. 减少饥饿 —— 没有任务会等太久 │
│ 3. 提高 CPU 利用率 │
│ 4. 对比 Node.js 的单线程,可以利用多核 │
│ │
└──────────────────────────────────────────────────────────┘
24.3.3 spawn_blocking —— 处理阻塞操作
重要: 永远不要在异步任务中执行阻塞操作!这会阻塞整个工作线程。
use tokio::task;
use std::time::Duration;
#[tokio::main]
async fn main() {
// ❌ 错误:在异步代码中使用阻塞操作
// std::thread::sleep(Duration::from_secs(5)); // 会阻塞工作线程!
// ✅ 正确:使用 tokio::time::sleep
tokio::time::sleep(Duration::from_secs(1)).await;
// ✅ 正确:对于必须阻塞的操作,使用 spawn_blocking
let result = task::spawn_blocking(|| {
// 这段代码在专门的阻塞线程池中运行
// 不会阻塞 Tokio 的工作线程
std::thread::sleep(Duration::from_secs(2));
// 计算密集型操作也应该放在这里
let mut sum = 0u64;
for i in 0..1_000_000 {
sum += i;
}
sum
}).await.unwrap();
println!("计算结果: {}", result);
// 常见需要 spawn_blocking 的场景:
// 1. 同步文件 I/O(std::fs)
// 2. CPU 密集型计算
// 3. 调用阻塞的 C 库
// 4. 密码哈希(bcrypt 等)
}
24.3.4 tokio::join! 和 tokio::select!
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// join! —— 并发等待多个 future,全部完成后返回
// 类似 Promise.all
let (r1, r2, r3) = tokio::join!(
async {
sleep(Duration::from_millis(100)).await;
"任务1完成"
},
async {
sleep(Duration::from_millis(200)).await;
"任务2完成"
},
async {
sleep(Duration::from_millis(150)).await;
"任务3完成"
}
);
println!("{}, {}, {}", r1, r2, r3);
// select! —— 等待多个 future,第一个完成的就返回
// 类似 Promise.race
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {
println!("1 秒超时");
}
_ = sleep(Duration::from_millis(500)) => {
println!("500ms 先完成!"); // 这个会被执行
}
}
}
24.3.5 任务的 Send 约束
use std::rc::Rc;
use std::sync::Arc;
#[tokio::main]
async fn main() {
// 多线程运行时中,spawn 的 future 必须是 Send
// 因为任务可能在不同线程间迁移
// ❌ Rc 不是 Send
// let rc = Rc::new(42);
// tokio::spawn(async move {
// println!("{}", rc); // 编译错误:Rc 不是 Send
// });
// ✅ Arc 是 Send
let arc = Arc::new(42);
tokio::spawn(async move {
println!("{}", arc); // OK
}).await.unwrap();
// ❌ 持有 MutexGuard 跨 await 点
// let mutex = std::sync::Mutex::new(0);
// tokio::spawn(async {
// let lock = mutex.lock().unwrap();
// some_async_fn().await; // ❌ MutexGuard 跨 await 不是 Send
// drop(lock);
// });
// ✅ 使用 tokio::sync::Mutex 或限制 guard 的作用域
let mutex = Arc::new(tokio::sync::Mutex::new(0));
let m = mutex.clone();
tokio::spawn(async move {
let mut lock = m.lock().await;
*lock += 1;
// lock 在这里 drop,然后才 await
}).await.unwrap();
}
24.4 异步 I/O
24.4.1 TcpStream —— 异步 TCP
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建 TCP 监听器
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("服务器监听在 127.0.0.1:8080");
loop {
// 接受新连接
let (mut socket, addr) = listener.accept().await?;
println!("新连接来自: {}", addr);
// 为每个连接创建一个新任务
tokio::spawn(async move {
let mut buf = [0u8; 1024];
loop {
// 异步读取数据
let n = match socket.read(&mut buf).await {
Ok(0) => {
println!("{} 断开连接", addr);
return;
}
Ok(n) => n,
Err(e) => {
eprintln!("读取错误: {}", e);
return;
}
};
// 异步写回数据(回声服务器)
if let Err(e) = socket.write_all(&buf[..n]).await {
eprintln!("写入错误: {}", e);
return;
}
}
});
}
}
对比 Node.js 的 TCP 服务器:
// Node.js 的 TCP 回声服务器
const net = require('net');
const server = net.createServer((socket) => {
console.log('新连接:', socket.remoteAddress);
socket.on('data', (data) => {
socket.write(data); // 回声
});
socket.on('end', () => {
console.log('断开连接');
});
});
server.listen(8080, () => {
console.log('监听在 8080');
});
24.4.2 异步文件操作
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, AsyncBufReadExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 写文件
fs::write("hello.txt", "你好,Tokio!").await?;
// 读文件
let content = fs::read_to_string("hello.txt").await?;
println!("文件内容: {}", content);
// 追加写入
let mut file = fs::OpenOptions::new()
.append(true)
.open("hello.txt")
.await?;
file.write_all(b"\n第二行内容").await?;
// 逐行读取
let file = fs::File::open("hello.txt").await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
println!("行: {}", line);
}
// 复制文件
fs::copy("hello.txt", "hello_copy.txt").await?;
// 删除文件
fs::remove_file("hello_copy.txt").await?;
// 创建目录
fs::create_dir_all("data/subdir").await?;
// 列出目录
let mut entries = fs::read_dir("data").await?;
while let Some(entry) = entries.next_entry().await? {
println!(" {:?} - {:?}", entry.file_name(), entry.file_type().await?);
}
// 清理
fs::remove_dir_all("data").await?;
fs::remove_file("hello.txt").await?;
Ok(())
}
24.4.3 异步 HTTP 客户端(reqwest)
// Cargo.toml:
// [dependencies]
// reqwest = { version = "0.12", features = ["json"] }
// serde = { version = "1", features = ["derive"] }
// serde_json = "1"
use serde::Deserialize;
#[derive(Deserialize, Debug)]
struct Todo {
id: u32,
title: String,
completed: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// GET 请求
let resp = reqwest::get("https://jsonplaceholder.typicode.com/todos/1")
.await?
.json::<Todo>()
.await?;
println!("Todo: {:?}", resp);
// 并发请求
let urls = vec![
"https://jsonplaceholder.typicode.com/todos/1",
"https://jsonplaceholder.typicode.com/todos/2",
"https://jsonplaceholder.typicode.com/todos/3",
];
let mut handles = vec![];
for url in urls {
handles.push(tokio::spawn(async move {
reqwest::get(url).await?.json::<Todo>().await
}));
}
for handle in handles {
match handle.await? {
Ok(todo) => println!("获取到: {:?}", todo),
Err(e) => eprintln!("请求失败: {}", e),
}
}
Ok(())
}
24.5 异步同步原语(tokio::sync)
24.5.1 为什么需要异步同步原语?
// ❌ 不要在异步代码中使用 std::sync::Mutex
// 因为 lock() 是阻塞的,会阻塞工作线程
// use std::sync::Mutex;
// ✅ 使用 tokio::sync::Mutex
// 它的 lock() 是异步的,不会阻塞工作线程
use tokio::sync::Mutex;
24.5.2 tokio::sync::Mutex
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = counter.clone();
handles.push(tokio::spawn(async move {
for _ in 0..100 {
let mut lock = counter.lock().await; // 异步获取锁
*lock += 1;
// lock 在这里自动释放
}
}));
}
for handle in handles {
handle.await.unwrap();
}
println!("最终计数: {}", *counter.lock().await); // 1000
}
注意: 如果你的锁操作不跨 .await 点,用 std::sync::Mutex 反而更好(开销更小):
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(vec![]));
let d = data.clone();
tokio::spawn(async move {
// 锁的作用域不跨 await —— 用 std::sync::Mutex 即可
{
let mut lock = d.lock().unwrap();
lock.push(42);
} // lock 在 await 之前释放
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}).await.unwrap();
}
24.5.3 tokio::sync::RwLock
use std::sync::Arc;
use tokio::sync::RwLock;
#[tokio::main]
async fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
// 多个读取者可以同时获取读锁
let d1 = data.clone();
let d2 = data.clone();
let reader1 = tokio::spawn(async move {
let lock = d1.read().await;
println!("读取者 1: {:?}", *lock);
});
let reader2 = tokio::spawn(async move {
let lock = d2.read().await;
println!("读取者 2: {:?}", *lock);
});
// 写入者需要独占锁
let d3 = data.clone();
let writer = tokio::spawn(async move {
let mut lock = d3.write().await;
lock.push(4);
println!("写入者完成: {:?}", *lock);
});
let _ = tokio::join!(reader1, reader2, writer);
}
24.5.4 Channel(通道)
Tokio 提供了多种异步通道,对比 Node.js 的 EventEmitter:
use tokio::sync::{mpsc, oneshot, broadcast, watch};
#[tokio::main]
async fn main() {
// === mpsc: 多生产者、单消费者 ===
// 最常用的通道类型
println!("--- mpsc 通道 ---");
let (tx, mut rx) = mpsc::channel::<String>(32); // 缓冲区大小 32
// 多个生产者
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
tx.send(format!("消息来自生产者 {}", i)).await.unwrap();
});
}
drop(tx); // 释放原始发送端
while let Some(msg) = rx.recv().await {
println!("收到: {}", msg);
}
// === oneshot: 单次发送通道 ===
// 适合请求-响应模式
println!("\n--- oneshot 通道 ---");
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
tx.send("一次性结果".to_string()).unwrap();
});
let result = rx.await.unwrap();
println!("收到: {}", result);
// === broadcast: 广播通道 ===
// 多生产者、多消费者,每个消费者都能收到所有消息
println!("\n--- broadcast 通道 ---");
let (tx, _) = broadcast::channel::<String>(16);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tx.send("广播消息!".to_string()).unwrap();
println!("rx1 收到: {}", rx1.recv().await.unwrap());
println!("rx2 收到: {}", rx2.recv().await.unwrap());
// === watch: 状态观察通道 ===
// 只保留最新值,适合配置变更通知
println!("\n--- watch 通道 ---");
let (tx, mut rx) = watch::channel("初始状态".to_string());
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
tx.send("更新后的状态".to_string()).unwrap();
});
// 等待值变化
rx.changed().await.unwrap();
println!("状态变为: {}", *rx.borrow());
}
24.5.5 Semaphore(信号量)
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 信号量:限制并发数
// 类似于连接池、限流器
let semaphore = Arc::new(Semaphore::new(3)); // 最多 3 个并发
let mut handles = vec![];
for i in 0..10 {
let sem = semaphore.clone();
handles.push(tokio::spawn(async move {
// 获取许可
let _permit = sem.acquire().await.unwrap();
println!("[{:?}] 任务 {} 开始(剩余许可: {})",
std::time::Instant::now(), i, sem.available_permits());
// 模拟工作
sleep(Duration::from_millis(500)).await;
println!("[{:?}] 任务 {} 结束",
std::time::Instant::now(), i);
// _permit 被 drop 时自动释放许可
}));
}
for handle in handles {
handle.await.unwrap();
}
}
24.5.6 Notify(通知)
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
// 等待通知的任务
let waiter = tokio::spawn(async move {
println!("等待通知...");
notify_clone.notified().await;
println!("收到通知!继续执行");
});
// 发送通知
sleep(Duration::from_secs(1)).await;
println!("发送通知!");
notify.notify_one();
waiter.await.unwrap();
}
24.6 超时与取消
24.6.1 超时处理
use tokio::time::{timeout, Duration};
#[tokio::main]
async fn main() {
// 方法 1:tokio::time::timeout
let result = timeout(
Duration::from_secs(2),
async {
// 模拟一个慢操作
tokio::time::sleep(Duration::from_secs(5)).await;
"完成"
}
).await;
match result {
Ok(value) => println!("成功: {}", value),
Err(_) => println!("超时!"), // 这个会被执行
}
// 方法 2:tokio::select! 实现超时
tokio::select! {
result = async {
tokio::time::sleep(Duration::from_secs(5)).await;
"完成"
} => {
println!("成功: {}", result);
}
_ = tokio::time::sleep(Duration::from_secs(2)) => {
println!("超时了!"); // 这个会被执行
}
}
}
24.6.2 取消任务
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
println!("任务开始...");
sleep(Duration::from_secs(10)).await;
println!("任务完成"); // 不会被执行
});
// 等 1 秒后取消任务
sleep(Duration::from_secs(1)).await;
handle.abort(); // 取消任务
println!("任务已取消");
// 检查任务是否被取消
match handle.await {
Ok(_) => println!("正常完成"),
Err(e) if e.is_cancelled() => println!("确认已取消"),
Err(e) => println!("其他错误: {}", e),
}
}
24.6.3 优雅关闭(Graceful Shutdown)
use tokio::signal;
use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
// 启动多个工作任务
for i in 0..3 {
let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
loop {
tokio::select! {
_ = async {
// 正常工作
sleep(Duration::from_secs(1)).await;
println!("工作者 {} 完成一轮", i);
} => {}
_ = shutdown_rx.recv() => {
println!("工作者 {} 收到关闭信号,清理中...", i);
// 执行清理工作
sleep(Duration::from_millis(100)).await;
println!("工作者 {} 已关闭", i);
return;
}
}
}
});
}
// 等待 Ctrl+C
println!("按 Ctrl+C 关闭...");
signal::ctrl_c().await.expect("无法监听 Ctrl+C");
println!("\n收到关闭信号!");
// 通知所有工作者关闭
drop(shutdown_tx); // 关闭发送端会通知所有接收端
// 给工作者一些时间完成清理
sleep(Duration::from_secs(1)).await;
println!("服务器已关闭");
}
24.7 实战:异步聊天服务器
让我们把学到的知识综合起来,构建一个完整的异步聊天服务器:
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::broadcast;
use std::net::SocketAddr;
/// 聊天消息类型
#[derive(Clone, Debug)]
struct ChatMessage {
/// 发送者地址
sender: SocketAddr,
/// 消息内容
content: String,
}
/// 处理单个客户端连接
async fn handle_client(
stream: TcpStream,
addr: SocketAddr,
tx: broadcast::Sender<ChatMessage>,
) {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut rx = tx.subscribe();
// 发送欢迎消息
let welcome = format!("欢迎来到聊天室!你的地址是 {}\n", addr);
if writer.write_all(welcome.as_bytes()).await.is_err() {
return;
}
// 广播新用户加入
let _ = tx.send(ChatMessage {
sender: addr,
content: format!("[系统] {} 加入了聊天室", addr),
});
let mut line = String::new();
loop {
tokio::select! {
// 从客户端读取消息
result = reader.read_line(&mut line) => {
match result {
Ok(0) => {
// 连接关闭
let _ = tx.send(ChatMessage {
sender: addr,
content: format!("[系统] {} 离开了聊天室", addr),
});
return;
}
Ok(_) => {
let msg = line.trim().to_string();
if !msg.is_empty() {
// 广播消息给所有人
let _ = tx.send(ChatMessage {
sender: addr,
content: format!("[{}] {}", addr, msg),
});
}
line.clear();
}
Err(e) => {
eprintln!("读取错误 {}: {}", addr, e);
return;
}
}
}
// 接收其他人的消息并转发给当前客户端
result = rx.recv() => {
match result {
Ok(msg) => {
// 不转发自己的消息
if msg.sender != addr {
let formatted = format!("{}\n", msg.content);
if writer.write_all(formatted.as_bytes()).await.is_err() {
return;
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
let warning = format!("[系统] 你错过了 {} 条消息\n", n);
let _ = writer.write_all(warning.as_bytes()).await;
}
Err(_) => return,
}
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:6379").await?;
println!("🚀 聊天服务器启动在 127.0.0.1:6379");
println!("📝 用 telnet 或 nc 连接: nc 127.0.0.1 6379");
// 创建广播通道
let (tx, _) = broadcast::channel::<ChatMessage>(100);
loop {
let (stream, addr) = listener.accept().await?;
println!("新连接: {}", addr);
let tx = tx.clone();
tokio::spawn(handle_client(stream, addr, tx));
}
}
使用方法:
# 终端 1:启动服务器
cargo run
# 终端 2:连接客户端 A
nc 127.0.0.1 6379
# 终端 3:连接客户端 B
nc 127.0.0.1 6379
# 在任一客户端输入消息,另一个客户端会收到
24.8 实战练习
练习 1:异步限流器
实现一个基于令牌桶算法的异步限流器:
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{sleep, Duration, interval};
struct RateLimiter {
// TODO: 使用 Semaphore 实现
// 每秒补充指定数量的许可
}
impl RateLimiter {
fn new(rate_per_second: u32) -> Arc<Self> {
// TODO: 实现
todo!()
}
async fn acquire(&self) {
// TODO: 获取一个许可(如果没有许可则等待)
todo!()
}
}
#[tokio::main]
async fn main() {
let limiter = RateLimiter::new(5); // 每秒最多 5 个请求
for i in 0..20 {
let limiter = limiter.clone();
tokio::spawn(async move {
limiter.acquire().await;
println!("[{:?}] 请求 {} 被处理", std::time::Instant::now(), i);
});
}
sleep(Duration::from_secs(5)).await;
}
练习 2:异步任务池
// 实现一个固定大小的异步任务池
// 类似 Node.js 的 p-limit
struct TaskPool {
// TODO: 使用 Semaphore 限制并发数
}
impl TaskPool {
fn new(concurrency: usize) -> Self {
todo!()
}
async fn run<F, T>(&self, task: F) -> T
where
F: std::future::Future<Output = T>,
{
// TODO: 获取许可后执行任务
todo!()
}
}
练习 3:增强聊天服务器
在上面的聊天服务器基础上,添加以下功能:
- 用户昵称:连接后先输入昵称
- 私聊命令:
/msg <用户> <内容>发送私聊消息 - 在线列表:
/who查看在线用户 - 踢人命令:
/kick <用户>踢出用户
练习 4:思考题
tokio::sync::Mutex和std::sync::Mutex的核心区别是什么?什么时候用哪个?- 为什么
tokio::spawn要求 future 是Send + 'static? select!中被取消的分支(没有被选中的)会怎样?- 在高并发场景下,
broadcast和mpsc通道各有什么优缺点? - 如何优雅地处理 TCP 连接的半关闭(half-close)?
24.9 本章小结
┌──────────────────────────────────────────────────────┐
│ Tokio 深入小结 │
├──────────────────────────────────────────────────────┤
│ │
│ Tokio 架构: │
│ - 多线程工作窃取调度器 │
│ - I/O 驱动 + 时间驱动 │
│ - 对比 Node.js:多线程、零成本、更可控 │
│ │
│ 核心 API: │
│ - tokio::spawn → 创建异步任务 │
│ - tokio::join! → 并发等待(Promise.all) │
│ - tokio::select! → 竞争等待(Promise.race) │
│ - spawn_blocking → 处理阻塞操作 │
│ │
│ 同步原语: │
│ - Mutex/RwLock → 异步锁 │
│ - mpsc → 多生产者单消费者通道 │
│ - broadcast → 广播通道 │
│ - watch → 状态观察通道 │
│ - Semaphore → 限制并发数 │
│ │
│ 最佳实践: │
│ ✅ 不要在异步代码中阻塞 │
│ ✅ 短锁用 std::sync,跨 await 用 tokio::sync │
│ ✅ 正确处理超时和取消 │
│ ✅ 实现优雅关闭 │
│ │
└──────────────────────────────────────────────────────┘
下一章预告: 第二十五章我们将探索 Rust 的性能优化 —— 从编译优化到基准测试,从火焰图到零拷贝技术,让你的代码飞起来!