从 tokio::time::sleep 到 tokio::net::TcpStream:时间驱动与事件驱动的两种异步实现
上篇文章跟踪了
tokio::time::sleep从 Future::poll 到哈希时间轮的完整链路。那篇文章的核心是一个问题:时间怎么做到异步等待的?本文要回答另一个问题:网络 I/O又是怎么做到异步等待的?它们共享同一个 runtime 主循环,但背后是两种完全不同的驱动模型。
引言:为什么需要第二篇?
上篇文章1中,我们看到了 tokio::time::sleep 的实现链条:
Future::poll → TimerEntry → Wheel(哈希时间轮) → Driver::park_timeout → epoll_wait
那条链路的核心架构是:用户态维护时间轮,只向内核注册一个「最早 deadline」的定时器。当线程从 epoll_wait 返回后,驱动去时间轮里收割所有到期 timer,批量唤醒对应的任务。
但这里有一个容易被忽略的细节:park_timeout 最终调用的 epoll_wait,本身就是 Linux 上最核心的 I/O 事件通知机制。时间驱动把 I/O 驱动的 park_timeout 当成了「闹钟」来用——它只是顺路借用了 epoll 的超时功能。真正的主角是I/O 驱动自己。
当你写下下面这行代码时,背后发生的事情和 sleep 有着本质区别:
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
stream.read(&mut buf).await?;
sleep 只关心”时间到了没”;TcpStream::read 关心的是”数据到了没”。前者在用户态就能判断(查时间轮),后者必须由内核通知——因为只有内核知道网卡上有没有新数据包到达。
这篇文章就从 TcpStream 注册到 I/O 驱动的完整流程讲起,拆开 ScheduledIo、Registration、PollEvented、I/O Driver 的事件循环,然后和上篇的 Timer 驱动做横向对比,看看它们在同一套 runtime 主循环里是如何共存并各自工作的。
先看一段代码
和上一篇文章一样,先从一个可运行的程序建立直觉。下面是一个简单的 TCP echo 服务,用 Tokio 同时处理多个连接:
// tokio_echo.rs
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
// 等待新连接(异步)
let (mut socket, addr) = listener.accept().await?;
// 每个连接 spawn 一个独立的任务
tokio::spawn(async move {
let mut buf = [0; 1024];
// 在一个连接上循环读写(异步)
loop {
match socket.read(&mut buf).await {
Ok(0) => break, // 连接关闭
Ok(n) => {
// 把读到的内容原样写回去
let _ = socket.write_all(&buf[..n]).await;
}
Err(_) => break,
}
}
println!("连接 {} 关闭", addr);
});
}
}
用一条线程跑上面的服务,它就可以同时接受成千上万个连接。每个连接上的 accept、read、write 都是异步的——调用时不会阻塞线程,线程会去服务其他连接。
对比一下同步阻塞版本的多线程实现:
// sync_echo.rs(多线程阻塞版)
use std::net::TcpListener;
use std::thread;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080")?;
loop {
let (mut socket, addr) = listener.accept()?;
// 每个连接启动一条新线程
thread::spawn(move || {
let mut buf = [0; 1024];
loop {
let n = socket.read(&mut buf).unwrap();
if n == 0 { break; }
socket.write_all(&buf[..n]).unwrap();
}
println!("连接 {} 关闭", addr);
});
}
}
同步版本也能处理多连接,但代价是每条连接一条 OS 线程。每个 thread::spawn 都涉及内核态的线程创建、栈分配(默认 2MB)、以及调度器的上下文切换。几千个连接就需要几千条线程——OS 调度器很快就不堪重负。
而 Tokio 版本却用一条线程做到了同步版本需要几千条线程才能做到的事。秘密就藏在这三个 .await 背后:它们不会阻塞线程,只会把任务挂起,把线程还给调度器去服务其他连接。
| 方面 | 同步多线程版 | Tokio 单线程版 |
|---|---|---|
| 并发模型 | 每个连接一条 OS 线程 | 单线程管理海量连接 |
| 资源开销 | 每条线程 2MB 栈 + OS 调度 | 每个任务几十字节的 Future 状态 |
| 上下文切换 | OS 内核态切换 | 用户态 async 切换 |
| 极限连接数 | 几千条线程后难以扩展 | 单线程可处理数十万连接 |
| IO 操作 | read/write 阻塞线程 |
.await 挂起任务不占线程 |
再看客户端怎么写。用 tokio 连接服务器并发送一条消息:
// tokio_client.rs
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
// 发送数据
stream.write_all(b"hello from tokio!").await?;
// 读取回显
let mut buf = [0; 1024];
let n = stream.read(&mut buf).await?;
println!("收到回显:{:?}", &buf[..n]);
Ok(())
}
服务端和客户端都用 tokio,双方都不占线程——两个 .await 已经把线程还给了各自的调度器。
完整的实验场景可以这样跑:
# 终端 1:启动 echo 服务器
$ cargo run --example tokio_echo
# 终端 2:启动客户端,看到回显
$ cargo run --example tokio_client
收到回显:"hello from tokio!"
接下来的内容就是拆开这个「秘密」——看看 .await 背后 Tokio 的 I/O 驱动是怎么利用 epoll 实现非阻塞 I/O 的。
与上一篇的关系
| 上一篇 | 本文 |
|---|---|
tokio::time::sleep |
tokio::net::TcpStream |
| 时间驱动(Timer Driver) | I/O 驱动(I/O Driver) |
| 哈希时间轮(用户态) | epoll(内核态) |
| 一个内核定时器管理所有 sleep | 所有 fd 注册到 epoll |
| 「时间到了」由内核定时器通知 | 「数据到了」由 epoll 事件通知 |
| 主动检查(poll time wheel) | 被动等待(wait on epoll) |
概念先行:时间驱动 vs 事件驱动
tokio::time::sleep 和 TcpStream::read 虽然都返回 Future,都可以 .await,但它们的驱动模型有本质区别。
时间驱动的 sleep 是「到时即醒」:线程把 park timeout 设到最早的 deadline,内核在 deadline 到达时通过时钟中断唤醒线程。判断”是否到期”完全可以在用户态通过比较 deadline 和当前时间完成,不需要内核在事件发生时通知。
事件驱动的 I/O 是「有数即醒」:线程阻塞在 epoll_wait 上,内核在数据到达网卡时通过硬件中断 → 驱动 → epoll 事件通知唤醒线程。判断”是否可读”只有内核知道——因为数据包什么时候到达是不可预测的。
用一句话概括核心区别:
sleep 的等待是确定性的——我知道 5 秒后肯定到;I/O 的等待是不确定性的——我不知道数据什么时候来。
这对数据结构选择产生了深远的影响:
graph LR
subgraph "时间驱动(sleep)"
A1["用户态时间轮<br/>O(1) 插入/Wakeup"] -->|"最近 deadline"| B1["内核 hrtimer × 1"]
B1 -->|"定时中断"| C1["线程醒来"]
C1 -->|"批量收割"| A1
end
subgraph "事件驱动(I/O)"
A2["ScheduledIo<br/>原子 readiness 位"] -->|"注册到"| B2["epoll<br/>内核管理所有 fd"]
B2 -->|"可读/可写事件"| C2["线程醒来"]
C2 -->|"更新 readiness"| A2
end
有了这个直觉,接下来就可以拆开源码了。
一、Driver 堆栈:TimeDriver 在 IODriver 之上
在深入 TcpStream 之前,需要先理解 Tokio 运行时的 Driver 堆栈结构。
Tokio 运行时的驱动是分层的——内层是 I/O 驱动(epoll),外层是时间驱动2:
// tokio/src/runtime/driver.rs: 44-50
pub(crate) struct Driver {
inner: TimeDriver,
}
pub(crate) struct Handle {
pub(crate) io: IoHandle, // I/O 驱动句柄
pub(crate) signal: SignalHandle, // 信号驱动句柄
pub(crate) time: TimeHandle, // 时间驱动句柄
pub(crate) clock: Clock,
}
这是 Worker 线程唯一能看到的 Driver struct——runtime::driver::Driver 是整个 runtime 对外暴露的统一 park 入口。Worker 的主循环永远只调这个 struct 的 park(handle) 和 park_timeout(handle, duration),从不关心内部嵌套了多少层。
用一句话概括:Driver 就是 runtime 的 sleep/wait 机制——线程通过它知道该睡多久、被谁叫醒、醒来后该干什么。
创建时,先创建 I/O 驱动,然后在其上包裹时间驱动2:
不过要注意,TimeDriver 的名字是历史遗留——它实际是一个条件编译 enum。当 time feature 关闭时,TimeDriver 被直接退化为 IoStack,Driver { inner: TimeDriver } 变成了 Driver { inner: IoStack },Timer 层完全不存在:
// runtime/driver.rs
cfg_not_time! {
type TimeDriver = IoStack; // ← 没有 time 时,inner 直接是 IoStack
}
所以三种 feature 组合下的实际形态是:
| time | io | Driver inner 的实际类型 |
|---|---|---|
| 开启 | 开启 | TimeDriver::Enabled { driver: time::Driver { park: IoStack(...) } } |
| 关闭 | 开启 | IoStack::Enabled(...)(TimeDriver 退化为此) |
| 关闭 | 关闭 | IoStack::Disabled(ParkThread)(连 I/O 都没有时退化到 Condvar) |
不论哪种配置,Worker 线程式始终只调 driver.park(handle)——runtime::driver::Driver 是唯一入口,内部嵌套是私有的实现细节。
// tokio/src/runtime/driver.rs: 47-58
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, cfg.timer_flavor, io_stack, &clock);
// ...
}
IoStack 本身又是一个小堆栈2:
IoStack (if io enabled):
ProcessDriver (子进程事件)
└── SignalDriver (Unix 信号)
└── IoDriver (mio::Poll)
当时间驱动需要 park 时,它把 park_timeout 委托给 IoStack,后者最终调用 mio::Poll::poll:
// tokio/src/runtime/time/mod.rs: 255-260
fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
// ...
match next_wake {
Some(when) => {
// 有定时器在将来到期:带超时 park
self.park_thread_timeout(rt_handle, duration);
}
None => {
// 没有定时器:无限期 park(除非有 limit)
if let Some(duration) = limit {
self.park_thread_timeout(rt_handle, duration);
} else {
self.park.park(rt_handle); // 无限期等待 I/O 事件
}
}
}
}
换句话说,Tokio 的 worker 线程只有一个统一的 park 循环:时间驱动检查时间轮中的最近 deadline,如果时间轮不为空,就用 epoll_wait(timeout) 带超时地等待 I/O 事件;如果时间轮为空,就无限期等待 I/O 事件——I/O 事件或者 mio::Waker 的 unpark 会把线程叫醒。
这个设计的精妙之处在于:时间驱动和 I/O 驱动共享一次 epoll 系统调用。如果时间轮里有一个 5 秒后到期的 sleep,同时一个 TCP socket 在 3 秒时收到数据——线程会在 3 秒时被 epoll 叫醒,然后时间驱动顺便检查时间轮、收割到期定时器。不需要两套独立的等待机制。
sequenceDiagram
participant TDriver as Time Driver
participant IDriver as I/O Driver (epoll)
participant Kernel as Linux Kernel
participant Clock as 时钟中断
TDriver->>IDriver: park_timeout(5000ms)
IDriver->>Kernel: epoll_wait(timeout=5000ms)
Note over Kernel: 等待 I/O 事件或超时
Kernel-->>IDriver: 数据到达 → epoll 返回
IDriver-->>TDriver: 醒来
TDriver->>TDriver: process_at_time() → 收割到期定时器
TDriver->>IDriver: 处理 I/O 事件
但这里的细节是:上述 seq 图是简化版。实际的 I/O Driver 的 turn() 首先被调用,它负责处理 epoll 事件并设置 ScheduledIo 的 readiness;然后才轮到 Time Driver 处理定时器。两者的协作是通过 driver::Handle 来协调的。
1.1 再往下看:mio 封装了什么?
至此,IoStack 的堆叠关系已经清楚。但再往下追问一步——IoDriver (mio::Poll) 这个最底层又是什么?
答案是 mio::Poll 是跨平台 I/O 多路复用的统一接口,它在编译时根据目标操作系统选择后端实现3:
// mio/src/sys/mod.rs: 42-56
#[cfg(any(unix, target_os = "hermit", ...))]
cfg_os_poll! {
mod unix;
pub use self::unix::*; // unix → 再根据 OS 选 epoll / kqueue / poll
}
#[cfg(windows)]
cfg_os_poll! {
mod windows;
pub use self::windows::*; // Windows IOCP
}
在 Unix 下,mio 的选择器文件分别对应三个级别的 I/O 多路复用机制:
| 操作系统 | mio 选择器 | 内核接口 | 核心文件 |
|---|---|---|---|
| Linux | epoll | epoll_create / epoll_ctl / epoll_wait |
mio/src/sys/unix/selector/epoll.rs |
| macOS / iOS / FreeBSD | kqueue | kqueue / kevent |
mio/src/sys/unix/selector/kqueue.rs |
| 其他 Unix 回退 | poll | poll |
mio/src/sys/unix/selector/poll.rs |
| Windows | IOCP | CreateIoCompletionPort / GetQueuedCompletionStatus |
mio/src/sys/windows/ |
| WASI (p1) | wasip1 | poll_oneoff |
mio/src/sys/wasip1/ |
epoll.rs 中的 Selector 直接封装了 epoll 系统调用4:
// mio/src/sys/unix/selector/epoll.rs: 32-66
impl Selector {
pub fn new() -> io::Result<Selector> {
// epoll_create1(EPOLL_CLOEXEC) 系统调用
let ep = unsafe {
OwnedFd::from_raw_fd(syscall!(epoll_create1(libc::EPOLL_CLOEXEC))?)
};
Ok(Selector { ep, .. })
}
pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
// epoll_wait 系统调用
syscall!(epoll_wait(
self.ep.as_raw_fd(),
events.as_mut_ptr(),
events.capacity() as i32,
timeout,
))
.map(|n| unsafe { events.set_len(n as usize) })
}
}
kqueue.rs 则是对另一套系统调用的封装5:
// mio/src/sys/unix/selector/kqueue.rs: 96-139
impl Selector {
pub fn new() -> io::Result<Selector> {
// kqueue 系统调用
let kq = unsafe { OwnedFd::from_raw_fd(syscall!(kqueue())?) };
Ok(Selector { kq, .. })
}
pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
// kevent 系统调用
syscall!(kevent(
self.kq.as_raw_fd(),
ptr::null(), // 无 changelist(只读不写)
0,
events.as_mut_ptr().cast(),
events.capacity() as Count,
timeout_ref,
))
.map(|n| unsafe { events.set_len(n as usize) })
}
}
mio 的 cfg 在 Cargo.toml 中通过 feature os-poll 触发条件编译6:Linux 上只编译 epoll.rs,macOS/FreeBSD 上只编译 kqueue.rs,Windows 上编译 IOCP。Tokio 的 I/O Driver 从不关心底层——它只调用 mio::Poll::poll,剩下的由 mio 按平台转发:
// tokio/src/runtime/io/driver.rs: 122-140 — Tokio 只看这一层
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
match self.poll.poll(events, max_wait) { // ← 不管下面是 epoll 还是 kqueue
Ok(()) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => panic!("..."),
}
for event in events.iter() {
// 通过 token 反查 ScheduledIo,设置 readiness,唤醒任务
}
}
这个抽象的意义在于:Tokio 的整个 I/O 驱动——ScheduledIo、Registration、PollEvented——全部是跨平台的。同一份代码编译后,在 Linux 上走 epoll,在 macOS 上走 kqueue,在 Windows 上走 IOCP。平台的差异被 mio 完全封装在 src/sys/unix/selector/ 下的三个文件中。
回到最初的问题:driver 层,是不是就是 tokio + mio 封装不同操作系统的 NIO 库细节的层面(比如 epoll 和 kqueue)?
是的,但可以更精确地分为两层:
- Tokio I/O Driver(
runtime::io::Driver):持有mio::Poll,管理ScheduledIo的原子 readiness 状态机和 waker 分发 - mio(操作系统接口层):编译时选择 epoll / kqueue / IOCP,把原生事件统一为
mio::Event
完整层叠关系可以用一张图来总结:
graph TD
A["TcpStream::read().await<br/>应用层"] --> B["Registration + PollEvented + ScheduledIo<br/>注册层"]
B --> C["runtime::io::Driver (持有 mio::Poll)<br/>Tokio 驱动层"]
C --> D["mio::Poll → Registry::register / poll<br/>mio 抽象层"]
D --> E["cfg!(target_os) 选择 epoll/kqueue/poll/IOCP<br/>平台选择层(编译时)"]
E --> F["epoll_wait / kevent / poll / GetQueuedCompletionStatus<br/>内核接口层"]
F --> G["网卡中断 → 协议栈 → socket buffer → 就绪通知<br/>物理层"]
1.2 Driver 堆叠结构:Timer 和 I/O 如何共享同一次 epoll_wait
前面的层叠图描绘了完整的竖线链路。但横着看,Driver 实际运行时是一层套一层套一层套一层——Tokio 有五个 Driver structs 和两个 enum/alias 层,它们的定义位置和嵌套关系如下:
| Driver 结构 | 定义文件:行 | 角色 | 持有 |
|---|---|---|---|
runtime::driver::Driver |
runtime/driver.rs:16 |
最外层,暴露给 Worker | inner: TimeDriver |
TimeDriver(enum) |
runtime/driver.rs:290 |
上方 Driver 的 inner,可选启用 | Enabled { driver: time::Driver } / Disabled(IoStack) |
runtime::time::Driver |
runtime/time/mod.rs:90 |
Timer Driver | park: IoStack |
IoStack(enum) |
runtime/driver.rs:138 |
park 栈桥 | Enabled(ProcessDriver) / Disabled(ParkThread) |
runtime::process::Driver |
runtime/process.rs:13 |
Process Driver(子进程清理) | park: SignalDriver |
runtime::signal::Driver |
runtime/signal/mod.rs:19 |
Signal Driver(Unix 信号) | io: io::Driver |
runtime::io::Driver |
runtime/io/driver.rs:25 |
I/O Driver,最底层 | poll: mio::Poll, events: mio::Events |
五个 Driver struct 的定义文件按创建顺序是:
runtime::driver::Driver::new()
→ create_io_stack() → runtime::io::Driver (runtime/io/driver.rs:25)
→ create_signal_driver(io_drv) → runtime::signal::Driver (runtime/signal/mod.rs:19)
→ create_process_driver(sig) → runtime::process::Driver (runtime/process.rs:13)
→ create_time_driver(io_stack) → runtime::time::Driver (runtime/time/mod.rs:90)
→ Driver { inner: TimeDriver } → runtime::driver::Driver (runtime/driver.rs:16)
Type alias 的条件编译退化:
| alias | 启用时 → | 关闭时退化为 |
|---|---|---|
IoDriver |
runtime::io::Driver(runtime/driver.rs:135) |
—(无 IO 时无 alias) |
SignalDriver |
runtime::signal::Driver(runtime/driver.rs:244) |
IoDriver(runtime/driver.rs:258) |
ProcessDriver |
runtime::process::Driver(runtime/driver.rs:269) |
SignalDriver(runtime/driver.rs:278) |
一个 Runtime 有多少个 Driver 实例?
builder.rs 中创建 Driver 的入口只有一行:
// tokio/src/runtime/builder.rs: 1672 / 1860
let (driver, driver_handle) = driver::Driver::new(cfg)?;
无论 current_thread 还是 multi_thread 调度器,一个 Runtime 只调一次。所以默认全部启用时,每种 struct 恰好一个实例,按值嵌套(struct 字段直接持有所属 struct,不是 Arc 也不是 Box):
runtime::driver::Driver × 1 ← 最外层
runtime::time::Driver × 1 ← Timer 层
runtime::process::Driver × 1 ← 子进程清理层 (Linux)
runtime::signal::Driver × 1 ← Unix 信号层 (Unix)
runtime::io::Driver × 1 ← 最底层,持有 mio::Poll
嵌套链是用字段按值 struct A { b: B } 串起来的,不是指针。
创建过程:从内到外逐层包裹
driver::Driver::new(cfg) 内部按从内到外的顺序依次创建每个 struct,每个 ::new() 接收下一层的 struct 作为参数,按值取走所有权:
// tokio/src/runtime/driver.rs: 47-58
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
// 第一步:从内到外包
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, cfg.timer_flavor, io_stack, &clock);
Ok((
Self { inner: time_driver }, // 最外层
// ...
))
}
三层帮助函数展开如下。
create_io_stack()(runtime/driver.rs:149-170):
fn create_io_stack(enabled: bool, nevents: usize) -> ... {
let (io_driver, io_handle) = io::Driver::new(nevents)?; // ⑤ io::Driver
let (signal_driver, signal_handle) =
create_signal_driver(io_driver, &io_handle)?; // ④ signal::Driver 拿走了 io::Driver
let process_driver = create_process_driver(signal_driver); // ③ process::Driver 拿走了 signal::Driver
(IoStack::Enabled(process_driver), ...) // IoStack 拿走了 process::Driver
}
create_signal_driver()(runtime/driver.rs:247-253):
fn create_signal_driver(io_driver: IoDriver, io_handle: &io::Handle)
-> io::Result<(SignalDriver, SignalHandle)>
{
let driver = crate::runtime::signal::Driver::new(io_driver, io_handle)?;
// io::Driver 被按值移入 ↑
let handle = driver.handle();
Ok((driver, Some(handle)))
}
create_process_driver()(runtime/driver.rs:271-275):
fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
ProcessDriver::new(signal_driver)
// SignalDriver 被按值移入 ↑
}
create_time_driver()(runtime/driver.rs:305-323):
fn create_time_driver(enable, timer_flavor, io_stack: IoStack, clock) -> (TimeDriver, TimeHandle) {
let (driver, handle) = time::Driver::new(io_stack, clock);
// IoStack 被按值移入 ↑
(TimeDriver::Enabled { driver }, Some(handle))
}
最终 Driver { inner: TimeDriver } 拿到最外层。所以这 5 个 struct 并不是”各自创建后组装”,而是一个接一个被吞进下一层:io::Driver 被 signal::Driver 吞掉,signal::Driver 被 process::Driver 吞掉,process::Driver 被 IoStack 吞掉,IoStack 被 time::Driver 吞掉,time::Driver 被 TimeDriver 吞掉,TimeDriver 被 driver::Driver 吞掉。最后只有 driver::Driver 这一个 struct 活着暴露给 Worker。
五个 struct 全部在堆栈上从一个 driver::Driver::new() 调用中构建完成。多线程下,整个嵌套体被放进 TryLock<Driver>,所有 worker 共享,只有执行 park_timeout 的线程才能排他地拿走 Driver。
包含关系与 park 调用链映射
上面的嵌套链是静态包含关系——编译时就确定的 struct 字段层级。而 park 调用链是运行时方法调用路径——从 Driver::park() 开始一直委派到 io::Driver::turn()。两者是平行映射的。
每层的委派模式不完全一致——time::Driver 在前面先做自己的事然后委派,process/signal 在后面先委派再处理自己的事:
静态包含关系 运行时 park 调用链
───────────────── ────────────────────
driver::Driver Driver::park(handle)
└── inner: TimeDriver(enum) → TimeDriver::park(handle)
└── TimeDriver::Enabled → time::Driver::park_internal(handle)
└── park: IoStack(enum) → IoStack::park_timeout(handle, dur)
└── IoStack::Enabled → ProcessDriver::park_timeout(handle, dur)
└── process::Driver → process::Driver::park_timeout(handle, dur)
└── park: SignalDriver → SignalDriver::park_timeout(handle, dur)
└── io: io::Driver → io::Driver::park_timeout(handle, dur)
→ io::Driver::turn()
→ self.poll.poll() (epoll_wait)
每一层的 park/park_timeout 方法签名完全一致:fn(&mut self, handle: &driver::Handle) 或带 duration: Duration。但注意没有 trait——每个 impl 块是独立写的,只是碰巧签名相同。
但委派的时机不同——各层是前序、后序混合:
time::Driver::park_internal: 前序 → 先查时间轮(自己事),再调 IoStack
process::Driver::park: 后序 → 先调 signal::Driver,再 reap_orphans
signal::Driver::park: 后序 → 先调 io::Driver,再 process() 处理信号
io::Driver::park: 终点 → 没有委派,直接 turn() → epoll_wait
time::Driver 需要先知道 timeout(查时间轮),然后才决定是否委派,所以是前序。process/signal 没有需要提前得到的信息,委派回来后顺便干点自己的活,所以是后序。
例外还有最外层 runtime::driver::Driver——它不做自己的事,self.inner.park(handle) 纯委派。
这就是整个 Driver 堆叠的静态结构和动态路径的完整映射。
每层的职责
从外到内,每层在 park 循环中做的事情:
| 层级 | 在 park 调用链中做的事 | 在返回路径中做的事 |
|---|---|---|
driver::Driver |
纯壳子:直接委派给 TimeDriver,Worker 只认这一个 struct |
无 |
TimeDriver (enum) |
开关层:判断 timer 是否启用。启用则委派给 time::Driver,关闭则委派给 IoStack |
无 |
time::Driver |
查时间轮:next_expiration_time() 算最早 deadline。到期则 park_timeout(0s),未到期则设 timeout |
收割到期 timer:process_at_time() → fire() → wake() |
IoStack (enum) |
I/O 开关:启用则走 epoll 栈(process::Driver),关闭则退化为 ParkThread::park(Condvar::wait) |
无 |
process::Driver |
委派给 SignalDriver |
清理用户子进程:GlobalOrphanQueue::reap_orphans()——回收用户通过 tokio::process::Command 启动的子进程退出后的僵尸进程表项。runtime 本身不启动任何额外进程 |
signal::Driver |
委派给 io::Driver |
处理信号:读 self-pipe,globals().broadcast() |
io::Driver |
唯一真正干活:turn() 调 self.poll.poll(events, max_wait) 执行 epoll_wait / kevent |
分发 I/O 事件:遍历 events,设 ScheduledIo readiness,wake() 任务 |
调用委派方向是从外到内,每层先委派完再处理自己的逻辑:
park 方向: 外 → TimeDriver → time::Driver → IoStack → process → signal → io::Driver::turn()
↓
epoll_wait
返回方向: 外 ← TimeDriver ← time::Driver ← IoStack ← process ← signal ← io::Driver::turn()
↓ ↓ ↓
process_at_time reap process
收割到期 timer orphans 信号
epoll_wait 返回后,事件处理的顺序是固定的:I/O 事件最先 → 信号处理 → 子进程清理 → 到期 timer 最后。这不是显式配置的优先级,而是嵌套结构带来的自然结果——最内层(io::Driver) 最先被调用也最先返回处理,最外层(time::Driver)最后处理。嵌套越深,优先级越高。
这个模型可以类比为线性链的后序遍历(post-order):子节点处理完 → 父节点处理。Driver 嵌套链是一个线性调用树(每个节点只有一个子节点),park 方向是前序(先查时间轮,再进子节点),返回方向是后序(先分发 I/O 事件,再处理信号,最后收割 timer)。
条件编译可以缩短链:无 process feature 时 process::Driver 退化为 SignalDriver,无 signal 时 SignalDriver 退化为 IoDriver。但剩余层的顺序和职责不变。
但链不是无限可缩的——最外层和最内层是固定的:
runtime::driver::Driver ← 必须存在,Worker 只看这一个入口
└── TimeDriver ← 必须存在,enum 天生存在
├── time::Driver ← 可关(关 time 时 → IoStack)
└── IoStack ← 必须存在,enum 天生存在
├── process ← 可关(退化 → SignalDriver)
├── signal ← 可关(退化 → io::Driver)
└── io::Driver ← 可关(关 io 时 → ParkThread)
└── ParkThread ← 最终兜底(Condvar::wait)
所以真正永远无法跳过的是三层 struct/enum:
| 层级 | 为什么不可缩 | 最简形态 |
|---|---|---|
runtime::driver::Driver |
Worker 线程调它的 park(handle) |
Driver { inner: TimeDriver } |
TimeDriver |
enum 天生存在;time 关闭时退化为 IoStack 别名 |
TimeDriver = IoStack |
IoStack |
enum 天生存在;io 关闭时退化为 ParkThread |
IoStack::Disabled(ParkThread) |
链必须从 runtime::driver::Driver 走到某个能阻塞线程的兜底机制。io::Driver(带 epoll_wait)是最常见的兜底,但不是唯一的——ParkThread(Condvar::wait)才是永远存在的最后防线。关掉所有可选层后,runtime 退化为一个可以用 tokio::spawn 做纯计算任务调度器,没有 I/O、没有 Timer、没有 Process、没有 Signal。
全部默认启用时的完整堆叠可以用下面的关系图表示。左侧是 Driver 嵌套(实线 --> = 按值包含),右侧是 Handle 共享(虚线 o-- = Arc 聚合):
graph LR
subgraph "Driver 嵌套(按值包含)"
D["runtime::driver::Driver"] -->|"inner"| TD["TimeDriver (enum)"]
TD -->|"Enabled.driver"| T["runtime::time::Driver"]
T -->|"park"| IS["IoStack (enum)"]
IS -->|"Enabled"| PD["ProcessDriver =<br/>process::Driver"]
PD -->|"park"| SD["signal::Driver"]
SD -->|"io"| ID["runtime::io::Driver<br/>{ poll: mio::Poll }"]
end
subgraph "Handle 聚合(Arc 共享)"
H["driver::Handle"] -->|"io"| IOH["IoHandle (enum)"]
IOH -.->|"Enabled (Arc)"| IOH2["io::Handle<br/>{ registry, waker }"]
end
- 左侧实线
-->:按值组合(struct A { b: B }),每层包含下一层的完整 struct 实例 IoStack和TimeDriver的Enabled/Disabled分支在条件编译时可跳过对应层级- 右侧虚线
o--:Arc 聚合,io::Handle被所有 worker 线程通过Arc共享,用于add_source()和unpark(),不走 Driver 的 park 栈
除 io::Handle 通过 Arc 聚合共享外,time::Handle 也是 Option<time::Handle>(Arc<InnerState>),所有 worker 线程都通过 Arc<driver::Handle> 共享同一组 Handle。
关键认识(两个层次——外部统一入口 vs 内部各搞各的):
外部统一入口:Worker 线程只看
runtime::driver::Driver这一个 struct,永远只调它的park/park_timeout,不关心内部嵌套。inner: TimeDriver根据 feature 配置退化为IoStack或ParkThread,暴露给 Worker 的 API 不变。内部各搞各的:链上的五个 Driver struct(
driver/time/process/signal/io)没有共享的trait。每个impl Driver { park() }是孤立定义的,只是碰巧有相同的签名。调用链靠硬编码的字段委派维系,编译器不检查签名一致性,也没有Box<dyn Park>的动态替换能力。
具体 struct 关系:
IoDriver不是独立 struct,它是runtime::io::Driver的类型别名,定义在runtime/driver.rs:92:pub(crate) type IoDriver = crate::runtime::io::Driver;TimeDriver不是独立 Driver,它是runtime/driver.rs内部的 enum,用来表达 timer 是否启用、启用哪个 flavorProcessDriver/SignalDriver也是 type alias,它们的存在是为了在条件编译下保持类型统一——如果 process/signal 被关掉,它们就退化为下一层的别名,不增加嵌套深度runtime::io::Driver是最底层,直接持有mio::PollHandle中的io: IoHandle是独立于 park 栈的——它通过Arc<runtime::io::Handle>直接 unpark I/O Driver,不走 Timer Driver
park 调用链:Driver::park → TimeDriver::park → time::Driver::park_internal → IoStack::park_timeout → ProcessDriver::park → ... → io::Driver::turn → mio::Poll::poll。
这条结构链决定了两种使用场景的走法完全不同:
场景 A:只有 I/O,没有 timer(TcpStream 纯读)
Worker 线程主循环
→ park_internal()
→ 查 Wheel → next_wake = None(时间轮是空的)
→ IoStack::park_timeout(None) ← 无 timer,传 None
→ I/O Driver::turn()
→ epoll_wait(events, -1) ← 无限期等 I/O
→ [数据到达,网卡中断]
→ epoll_wait 返回
→ 遍历 events → ScheduledIo::set_readiness()
→ waker.wake() ← 唤醒 TcpStream 的任务
→ process_at_time(now) ← 时间轮为空,no-op
→ 返回到调度器,执行就绪任务
场景 B:只有 timer,没有 I/O(纯 sleep)
Worker 线程主循环
→ park_internal()
→ 查 Wheel → next_wake = Some(10000)
→ duration = 10000 - now = 5000ms
→ IoStack::park_timeout(5000ms)
→ I/O Driver::turn()
→ epoll_wait(events, 5000) ← 带 5s 超时
→ [超时,hrtimer 到期]
→ epoll_wait 返回(无 I/O 事件)
→ process_at_time(now=10000) ← 收割到期 timer
→ fire() → waker.wake() ← 唤醒 Sleep 的任务
场景 C:既有 I/O 又有 timer(常见混合场景)
Worker 线程主循环
→ park_internal()
→ 查 Wheel → next_wake = Some(10000)
→ IoStack::park_timeout(5000ms)
→ I/O Driver::turn()
→ epoll_wait(events, 5000)
→ [2s 时,TcpStream 数据到达]
→ epoll_wait 提前返回 ← I/O 事件先到
→ 遍历 events → ScheduledIo::set_readiness()
→ waker.wake() ← 唤醒 TcpStream 的任务
→ process_at_time(now=7000) ← elapsed 只走到 7000,不是 10000
→ 没有 timer 到期(deadline 10000 > 7000)
→ 返回调度器,继续处理
精妙之处在场景 C:epoll_wait 的 OR 语义让两个驱动共享一次系统调用——TcpStream 数据先到就提前回来,Timer 顺便看一眼谁到期了;hrtimer 先到就准时处理 timer。不需要两套独立的等待线程或两套 epoll 实例。
sequenceDiagram
participant Worker as Worker 线程
participant TD as Time Driver
participant IOD as I/O Driver
participant EP as epoll_wait
note over Worker: 场景 C:5s timer + 1 个 TcpStream
Worker->>TD: park_internal()
TD->>TD: next_wake = 10000, duration = 5000
TD->>IOD: park_timeout(5000ms)
IOD->>EP: epoll_wait(events, 5000)
note over EP: OR 等待:I/O 事件 or 5000ms 超时
EP-->>IOD: [2s] TcpStream 可读
IOD->>IOD: 处理 events → wake TcpStream
IOD-->>TD: turn() 返回
TD->>TD: process_at_time(now=7000)
note over TD: deadline 10000 > 7000 → 无 timer 到期
TD-->>Worker: 回到调度器
这就是 Tokio 的 Driver { inner: TimeDriver } 设计的核心:外层 Timer 决定等多久,内层 I/O 执行等待,醒来后各管各的数据。I/O Driver 不在 inner 字段里,它在 TimeDriver 的 park 字段里层层嵌套地藏着。
1.3 底层底座:所有 Driver 共享同一个 runtime::io::Driver
堆叠链中的每一层都有自己的 impl Driver { park() },但它们最终都有一个共同的落脚点——runtime::io::Driver(宿主 mio::Poll)。不管是 Timer、Signal 还是 Process,park 链的归宿都一样:
runtime::time::Driver::park_internal
→ IoStack::park_timeout
→ ProcessDriver::park_timeout
→ SignalDriver::park_timeout
→ io::Driver::park_timeout ← runtime::io::Driver
→ self.poll.poll(events, timeout)
runtime::io::Driver 的 struct 定义直接暴露了这一点7:
// tokio/src/runtime/io/driver.rs: 25-37
pub(crate) struct Driver {
/// True when an event with the signal token is received
signal_ready: bool,
/// Reuse the `mio::Events` value across calls to poll.
events: mio::Events,
/// The system event queue.
poll: mio::Poll,
}
它的 park_timeout 和 turn() 是实际执行 epoll_wait 的地方7:
// tokio/src/runtime/io/driver.rs: 138-154
pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.io();
self.turn(handle, Some(duration));
}
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
handle.release_pending_registrations();
let events = &mut self.events;
// 阻塞等待:这行是真正的 epoll_wait
match self.poll.poll(events, max_wait) { // ← mio::Poll::poll → epoll_wait
Ok(()) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => panic!("{e:?}"),
}
// 分发事件给 ScheduledIo
for event in events.iter() {
let ready = Ready::from_mio(event);
let ptr = super::EXPOSE_IO.from_exposed_addr(event.token().0);
let io: &ScheduledIo = unsafe { &*ptr };
io.set_readiness(Tick::Set, |curr| curr | ready);
io.wake(ready);
}
}
这些代码是 park 链最内层的一次调用。但多线程下,所有 worker 共享同一个 runtime::io::Driver——通过 TryLock<Driver> 做排他访问8:
// tokio/src/runtime/scheduler/multi_thread/park.rs: 48-65
struct Shared {
/// Shared driver. Only one thread at a time can use this
driver: TryLock<Driver>,
}
// park_timeout 时尝试拿锁:
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) -> HadDriver {
if let Some(mut driver) = self.inner.shared.driver.try_lock() {
// 拿到锁 → 走 Driver::park_timeout → epoll_wait
self.inner.park_driver(&mut driver, handle, Some(duration))
} else if !duration.is_zero() {
// 没拿到锁 → 退化为 condvar 等待
self.inner.park_condvar(Some(duration));
HadDriver::No
}
}
这就是为什么一个 Runtime 五个 Driver struct 但只一个 io::Driver(一个 mio::Poll / 一个 epoll fd):epoll fd 不能被多个线程同时 epoll_wait,所以多线程下通过 TryLock 串行化访问。拿不到锁的线程退化为 condvar 等待,等拿到锁的线程执行完 epoll_wait 后唤醒它。
TcpStream 不走 park 链。它不是通过 park() 去用 mio::Poll 的,而是直接通过 Handle::add_source() 把 fd 注册到同一个 mio::Poll 实例上:7
// tokio/src/runtime/io/driver.rs: 182-207 (Handle::add_source)
pub(super) fn add_source(&self, source: &mut impl mio::event::Source, interest: Interest)
-> io::Result<Arc<ScheduledIo>>
{
let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
let token = scheduled_io.token();
self.registry.register(source, token, interest.to_mio())?;
// ↑ 通过 mio::Registry::register → epoll_ctl(EPOLL_CTL_ADD)
Ok(scheduled_io)
}
而 park 链是通过 self.poll.poll(events, timeout)(即 epoll_wait)来消费事件。注册(epoll_ctl)和等待(epoll_wait)都在同一个 mio::Poll 上完成:
同一个 mio::Poll 实例
┌──────────────────────┐
│ │
epoll_ctl ADD ◄──┤ runtime::io::Driver ├──► epoll_wait
(TcpStream 注册) │ { poll } │ (park 链等待)
│ │
└──────────────────────┘
▲
┌────────────┼────────────┐
│ │ │
Timer park Signal park Process park
epoll_wait epoll_wait epoll_wait
(timeout=5s) (timeout=-1) (timeout=-1)
换个角度看,五个 Driver struct 的关系是:
| Driver | 作用 | 是否调用 epoll_wait | 是否注册 fd |
|---|---|---|---|
driver::Driver |
最外层调度入口 | 否,委派给 TimeDriver | 否 |
time::Driver |
查时间轮 + 设 timeout | 否,委派给 IoStack | 否 |
process::Driver |
子进程清理 | 否,委派给 SignalDriver | 否 |
signal::Driver |
信号处理 | 否,委派给 io::Driver | 是,注册一个 UnixStream |
io::Driver |
真正的 epoll 封装 | 是 | 是,所有 fd |
runtime::io::Driver 是最底层——它是唯一真正执行 epoll 系统调用、唯一管理 epoll 注册表的那一层。上面四层 Driver 全部通过 io::Driver 间接完成等待,没有自己的 epoll 实例。
这也就意味着:即使一个 Worker 线程没有 Timer(time feature 关闭)、没有 Signal、没有 Process,它依然需要一个 runtime::io::Driver 来运行 mio::Poll::poll()——否则连基本的 TcpStream 等待都没法做。 io::Driver 是唯一的硬性依赖,其他 Driver 都是可选的 wrapper。
1.4 Driver 分层设计要点总结
将前文的分析浓缩为以下七条:
-
Tokio 内部共有五层 Drivers:
runtime::driver::Driver、runtime::time::Driver、runtime::process::Driver、runtime::signal::Driver、runtime::io::Driver,以及两个辅助 enumTimeDriver和IoStack。 -
没有共享的 trait:这些 Drivers 虽然都提供
park/park_timeout/shutdown方法,接口签名完全一致,但不是通过impl SomeTrait for Driver实现的——每个impl块是独立定义的,只有人工约定的签名一致性。 -
五层构成一条线性调用链:最外层
runtime::driver::Driver→TimeDriver→time::Driver→IoStack→process::Driver→signal::Driver→io::Driver(最底层)。链的长度可通过条件编译缩短,但顺序不变。 -
每一层调用下一层的
park方法:通过self.next.park(handle)或self.next.park_timeout(handle, duration)委派,最终到达io::Driver::turn()→self.poll.poll()(epoll_wait/kevent)。 -
前序调用与后序调用并存:
time::Driver是先查时间轮再委派(前序),process::Driver和signal::Driver是先委派再处理自己的逻辑(后序)。 -
条件编译可缩短链:通过 Cargo features(
net、time、process、signal)和 Builder 选项(.enable_io()、.enable_time())控制每层的启用。关掉所有可选层后,最少剩Driver→TimeDriver→IoStack::Disabled(ParkThread)。 -
每层全局唯一实例:一个 Runtime 恰好一套 Driver 实例,按值嵌套在
runtime::driver::Driver这一个 struct 中(不是全局 static 变量,是Runtime的字段)。多线程下所有 worker 共享同一个TryLock<Driver>。
二、TcpStream 的创建:一条完整的注册链
现在来看 TcpStream::connect("127.0.0.1:8080") 背后到底发生了什么。
2.1 从 TcpStream 到 PollEvented
TcpStream 的结构非常简单——它只是一个 PollEvented 的包装9:
// tokio/src/net/tcp/stream.rs: 72-74
pub struct TcpStream {
io: PollEvented<mio::net::TcpStream>,
}
PollEvented 是 Tokio 的一个通用包装器:它接收一个实现了 mio::event::Source 的类型(如 mio::net::TcpStream),将其注册到运行时的 I/O 驱动上10:
// tokio/src/io/poll_evented.rs: 89-94
pub(crate) struct PollEvented<E: Source> {
io: Option<E>,
registration: Registration,
}
创建 PollEvented 时,需要拿到当前 runtime 的调度器句柄(scheduler::Handle),然后用它来完成注册10:
// tokio/src/io/poll_evented.rs: 130-139
pub(crate) fn new_with_interest_and_handle(
mut io: E,
interest: Interest,
handle: scheduler::Handle,
) -> io::Result<Self> {
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
Ok(Self { io: Some(io), registration })
}
TcpStream::new 只做了一件事:把 mio::net::TcpStream 交给 PollEvented::new9:
// tokio/src/net/tcp/stream.rs: 160-163
pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
let io = PollEvented::new(connected)?;
Ok(TcpStream { io })
}
2.2 从 PollEvented 到 Registration
Registration 是更底层的抽象。它持有一个 Arc<ScheduledIo>,后者是 I/O 驱动中真正管理状态的结构11:
// tokio/src/runtime/io/registration.rs: 64-72
pub(crate) struct Registration {
handle: scheduler::Handle,
shared: Arc<ScheduledIo>,
}
Registration::new_with_interest_and_handle 是整条注册链的关键入口11:
// tokio/src/runtime/io/registration.rs: 91-98
pub(crate) fn new_with_interest_and_handle(
io: &mut impl Source,
interest: Interest,
handle: scheduler::Handle,
) -> io::Result<Registration> {
let shared = handle.driver().io().add_source(io, interest)?;
Ok(Registration { handle, shared })
}
这里 handle.driver().io() 拿到 I/O Driver 的 Handle,然后调用 add_source。
2.3 从 Registration 到 I/O Driver
Handle::add_source 是真正把 fd 注册到 epoll 的地方12:
// tokio/src/runtime/io/driver.rs: 182-207
pub(super) fn add_source(
&self,
source: &mut impl mio::event::Source,
interest: Interest,
) -> io::Result<Arc<ScheduledIo>> {
// 1. 从 RegistrationSet 中分配一个新的 ScheduledIo
let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
let token = scheduled_io.token();
// 2. 通过 mio::Registry 将 source 注册到 epoll
// token 是 ScheduledIo 对象的指针值(作为 epoll 的标识)
if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
// 注册失败:从 set 中移除分配的 ScheduledIo
unsafe { self.registrations.remove(&mut self.synced.lock(), &scheduled_io) };
return Err(e);
}
self.metrics.incr_fd_count();
Ok(scheduled_io)
}
这里有一个关键设计:token 是 ScheduledIo 对象的地址。mio::Token 是一个 usize,而 ScheduledIo 是一个 Arc 管理的堆对象。add_source 使用 PtrExposeDomain 把 ScheduledIo 的地址暴露为 mio::Token,这样 epoll 返回事件时,Tokio 可以直接通过 token 反向找到对应的 ScheduledIo12:
// tokio/src/runtime/io/scheduled_io.rs: 268-270
pub(crate) fn token(&self) -> mio::Token {
mio::Token(super::EXPOSE_IO.expose_provenance(self))
}
整条注册链可以用下图来总结:
graph TD
A["TcpStream::connect(addr)"] --> B["mio::net::TcpStream::connect(addr)"]
B --> C["TcpStream::new(sys)"]
C --> D["PollEvented::new(connected)"]
D --> E["Registration::new_with_interest_and_handle"]
E --> F["Handle::add_source"]
F --> G["RegistrationSet::allocate → Arc<ScheduledIo>"]
F --> H["mio::Registry::register(source, token, interest)"]
H --> I["epoll_ctl(EPOLL_CTL_ADD, fd, ...)"]
style G fill:#e1f5fe
style H fill:#e1f5fe
style I fill:#c8e6c9
注册完成后,TcpStream 的 PollEvented 内部持有一个 Registration,Registration 内部持有一个 Arc<ScheduledIo>,而 ScheduledIo 的地址就是它在 epoll 中的 token。通过这个 token,epoll 事件可以反向定位到 ScheduledIo,运行时可快速处理就绪事件。
三、ScheduledIo:边沿触发的原子状态机
ScheduledIo 是 I/O 驱动的核心数据结构。如果说 Timer 驱动的核心是哈希时间轮和 TimerShared,那 I/O 驱动的核心就是 ScheduledIo 和它的原子状态13:
// tokio/src/runtime/io/scheduled_io.rs: 59-61
pub(crate) struct ScheduledIo {
linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
readiness: AtomicUsize, // 打包的 readiness 状态
waiters: Mutex<Waiters>, // 等待队列
}
3.1 打包的原子状态
readiness 字段是一个 AtomicUsize,但它承载了三个独立的信息,通过位打包(bit packing)放在一个原子变量里13:
// tokio/src/runtime/io/scheduled_io.rs: 224-230
// | shutdown | driver tick | readiness |
// |----------+-------------+-----------|
// | 1 bit | 15 bits | 16 bits |
const READINESS: bit::Pack = bit::Pack::least_significant(16);
const TICK: bit::Pack = READINESS.then(15);
const SHUTDOWN: bit::Pack = TICK.then(1);
- readiness(16 位):当前就绪状态的位掩码(可读、可写、错误、挂起等)
- tick(15 位):I/O 驱动的 tick 计数器,每次
set_readiness递增,用于防止清理旧的就绪事件 - shutdown(1 位):I/O 驱动是否已关闭
三个状态打包在一个 AtomicUsize 中,意味着对 readiness 的更新和检查都是原子操作,无需外部锁。
3.2 和 TimerShared 的对比
上篇文章介绍了 TimerShared 及其 StateCell(用 AtomicU64 实现的原子状态机)。两者都是原子状态机,但解决的问题不同:
| 方面 | TimerShared (sleep) |
ScheduledIo (I/O) |
|---|---|---|
| 核心状态 | 到期时间、是否已触发 | readiness、tick |
| 驱动方式 | 时间轮到期触发 | epoll 事件通知 |
| 更新方式 | CAS 循环(set_expiration/mark_pending) |
fetch_update(set_readiness) |
| 通知机制 | fire() → Waker::wake() |
wake(ready) → Waker::wake() |
| 并发角色 | 用户端(TimerEntry)+ 驱动端(Driver) |
用户端(Registration)+ 驱动端(Driver) |
| 数据结构 | 侵入式链表节点(用于时间轮 slot) | 侵入式链表节点(用于 RegistrationSet) |
3.3 边沿触发(ET)模型:为什么需要 tick
mio 在 Linux 上默认使用 epoll 的边沿触发(Edge-Triggered, ET)模式。ET 模式的特点是:只在状态从「不可用」变为「可用」时通知一次。这意味着:
- epoll 通知 readable →
ScheduledIo设置 READABLE 位 - 用户尝试
read()→ 成功读到部分数据 - epoll 不会再次通知(因为状态没有从不可读变为可读)
- 用户可以继续
read()直到返回WouldBlock→ 清除 READABLE 位 → 下次 epoll 通知再来
但这里有一个微妙的问题:如何防止清理旧事件时误清新事件? 这就是 tick 的作用。
sequenceDiagram
participant EPoll as epoll
participant SI as ScheduledIo
participant Task as User Task
EPoll->>SI: poll 返回 readable(tick=1)
SI->>SI: set_readiness: 设置 READABLE + tick=2
SI-->>Task: wake()
Task->>SI: poll_read_ready()
SI-->>Task: READABLE(tick=2)
Task->>Task: read() → 成功
Note over Task: 还没读完...
EPoll->>SI: 新数据到达,再次通知(tick=3)
SI->>SI: set_readiness: tick=3
Task->>SI: clear_readiness(tick=2) ← 旧 tick
SI->>SI: tick 不匹配 → 无操作
Task->>SI: poll_read_ready()
SI-->>Task: READABLE(tick=3 时设置)
Task->>Task: 继续读取
关键机制在 set_readiness 中13:
// tokio/src/runtime/io/scheduled_io.rs: 277-298
pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) {
let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| {
const MAX_TICK: usize = TICK.max_value() + 1;
let tick = TICK.unpack(curr);
let new_tick = match tick_op {
// 清理 readiness 时:如果 tick 不匹配,跳过此次操作
Tick::Clear(t) if tick as u8 != t => return None,
Tick::Clear(t) => t as usize,
// 设置 readiness 时:递增 tick
Tick::Set => tick.wrapping_add(1) % MAX_TICK,
};
let ready = Ready::from_usize(READINESS.unpack(curr));
Some(TICK.pack(new_tick, f(ready).as_usize()))
});
}
fetch_update 是一个乐观 CAS 循环:它尝试读取当前值、调用闭包生成新值、然后原子地替换。如果闭包返回 None(例如清理时的 tick 不匹配),则不进行任何更新。
这正是 ET 模型所需要的:每次 epoll 通知都会递增 tick,旧的事件清不到新 tick 的脏位。如果没有这个 tick 机制,一个慢任务可能在清理旧事件时无意中清掉了新到达的事件,导致数据静默丢失。
3.4 Waiters:等待队列
ScheduledIo 的第三个字段 waiters 管理了等待此 I/O 资源的任务13:
// tokio/src/runtime/io/scheduled_io.rs: 70-78
#[derive(Debug, Default)]
struct Waiters {
/// 所有等待者的侵入式链表
list: WaitList,
/// AsyncRead 专用 waker 槽
reader: Option<Waker>,
/// AsyncWrite 专用 waker 槽
writer: Option<Waker>,
}
这里有两个并行的机制:
- 专用 waker 槽(
reader/writer):用于poll_read_ready/poll_write_ready快速路径——每个方向只有一个任务在等待,直接用Option<Waker>存,无需链表 - 通用等待链表:用于
readiness()async 方法——支持多个任务同时等待同一个ScheduledIo的不同 Interest
当 I/O 事件到达时,wake() 方法遍历这两者并唤醒对应的任务13:
// tokio/src/runtime/io/scheduled_io.rs: 304-348
pub(super) fn wake(&self, ready: Ready) {
let mut wakers = WakeList::new();
let mut waiters = self.waiters.lock();
// 检查 read 槽
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
wakers.push(waker);
}
}
// 检查 write 槽
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
wakers.push(waker);
}
}
// 遍历链表中的等待者
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
while wakers.can_push() {
match iter.next() {
Some(waiter) => {
// ... 收集 waker,标记已就绪
}
None => break 'outer,
}
}
// 批次满了:释放锁后批量唤醒
drop(waiters);
wakers.wake_all();
waiters = self.waiters.lock();
}
// ...
}
wake_all 在释放锁之后进行,避免持锁时调用外部代码导致死锁——这和上篇文章中 Time Driver 的 waker_list.wake_all() 是相同的模式。
四、事件循环:turn()
当一切就绪后,是谁来驱动 I/O 事件的处理的?答案是 I/O Driver 的 turn() 方法12:
// tokio/src/runtime/io/driver.rs: 122-174
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
// 处理待释放的注册(被 drop 的 Registration)
handle.release_pending_registrations();
let events = &mut self.events;
// 核心:阻塞在 epoll_wait 上
match self.poll.poll(events, max_wait) {
Ok(()) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
// ...
}
// 处理所有返回的事件
let mut ready_count = 0;
for event in events.iter() {
let token = event.token();
if token == TOKEN_WAKEUP {
// mio::Waker 的 unpark,无需处理
} else if token == TOKEN_SIGNAL {
self.signal_ready = true;
} else {
let ready = Ready::from_mio(event);
// token 就是 ScheduledIo 的地址
let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
let io: &ScheduledIo = unsafe { &*ptr };
// 更新 readiness 状态并唤醒等待任务
io.set_readiness(Tick::Set, |curr| curr | ready);
io.wake(ready);
ready_count += 1;
}
}
// ...
}
turn() 的工作流程清晰明了:
- 释放待处理的注册:清理之前被 drop 但尚未从 epoll 中移除的
Registration - 阻塞等待:
self.poll.poll(events, max_wait)—— 对,这就是epoll_wait - 分发事件:遍历 epoll 返回的每个事件,通过 token 找到对应的
ScheduledIo,设置 readiness 位,唤醒等待的任务
当 I/O 驱动独立运行时(如 Time Driver 没有启用),max_wait 为 None 表示无限期等待;当 Time Driver 有定时器时,max_wait 为”到最早 deadline 的时间差”。
4.1 epoll 不保存数据
一个容易产生的误解是认为 epoll 返回事件时”带着数据一起回来了”。实际上,epoll 只通知”fd 可读了”,它不保存也不传递任何用户数据。数据从到达网卡到被用户程序读取,经历了完整的链路:
客户端发送数据包
→ 网卡硬件中断
→ 内核网络协议栈(IP/TCP 层)处理
→ 数据放入 socket 的接收缓冲区(内核内存)
→ epoll 检测到 fd 状态从不可读变为可读
→ epoll_wait 返回「fd X 可读」
→ Worker 线程醒来(turn() 返回)
→ 设置 ScheduledIo readiness 位
→ Waker::wake()
→ 任务重新入队,被调度执行
→ 任务调用 read(fd, buf)
→ read 系统调用把数据从内核 socket buffer 拷贝到用户缓冲区
epoll 的角色是门铃——它通知你”有东西到了,快来拿”,但它不负责把东西送来。真正的数据一直安静地躺在内核的 socket 接收缓冲区里,等用户任务被调度后通过 read() 系统调用拷贝到用户态。
这也是为什么 PollEvented::poll_read 的循环会在 read() 返回 WouldBlock 时清除 readiness 然后重试——因为如果假阳性发生了(数据被别的线程读走了或者已被处理),再次 read() 会告诉你 socket buffer 是空的,你需要清除 readiness 让 epoll 下次有新数据到达时重新通知你。
sequenceDiagram
participant NIC as 网卡硬件
participant Kernel as 内核协议栈
participant Buf as Socket Buffer
participant EPoll as epoll
participant Task as 用户任务
NIC->>Kernel: 数据包到达(硬件中断)
Kernel->>Buf: TCP 重组 → 数据放入 socket buffer
Buf->>EPoll: fd 状态变更(不可读 → 可读)
EPoll->>EPoll: epoll_wait 返回
Note over EPoll: epoll 只告诉你"fd X 可读"<br/>但不携带用户数据
EPoll-->>Task: Waker::wake()
Task->>Task: 任务被调度
Task->>Buf: read(fd, buf)
Buf-->>Task: 数据拷贝到用户缓冲区
对比一下 sleep 的”门铃”:
Timer Driver: epoll_wait(timeout) → 超时返回
→ 时间轮的定时器"只告诉你时间到了"
→ 没有数据要拷贝,直接 fire() → waker.wake()
I/O Driver: epoll_wait(events, timeout) → 事件返回
→ epoll 的 fd 事件"只告诉你 fd 可读了"
→ 数据还在内核 socket buffer 里
→ 设置 readiness → waker.wake()
→ 任务醒来后需要 read() 把数据拷贝到用户态
在这一点上,两种驱动的”通知”机制在语义上是类似的——都只是信号通知,而不是数据传输。区别仅在于:时间通知直接触发任务的唤醒(时间到了就够了),而 I/O 通知后任务还需要多一步 read() 系统调用来搬运数据。
整体驱动循环
将 I/O 驱动和时间驱动放在一起看,Tokio 的 worker 线程主循环大致是:
sequenceDiagram
participant Worker as Worker Thread
participant Time as Time Driver
participant IO as I/O Driver
participant Sched as Scheduler
loop 主循环
Worker->>Time: park_internal()
Time->>Time: 检查时间轮 → 下一个 deadline
Time->>IO: park/park_timeout(duration)
IO->>IO: epoll_wait(events, duration)
Note over IO: 被 I/O 事件或超时唤醒
IO-->>Time: 返回
Time->>Time: process_at_time(now)
Time->>Time: 收割到期 timer
Time-->>Worker: 线程继续
Worker->>Sched: 运行就绪任务
end
这也就是在上一篇文章中看到的 park_internal 流程——只是现在我们把 park 的底层展开了,看到了 I/O 驱动这一层。
4.2 epoll 是 blocking 的,non-blocking 在哪?
到这里你可能有一个疑惑:既然 turn() 里的 epoll_wait 会阻塞线程,那 non-blocking I/O 到底体现在哪里?
答案是:阻塞在线程层次,非阻塞在任务层次。这是多路复用(multiplexing)的精髓。
对比两种模式就一目了然:
同步阻塞模式:一条线程 = 一个连接
Thread A ── read(socket1) ── 阻塞等数据 ── 继续
Thread B ── read(socket2) ── 阻塞等数据 ── 继续
→ N 个连接需要 N 条线程,OS 调度开销大
epoll 多路复用:一条线程 = 所有连接
Worker ── epoll_wait([socket1, socket2, ...]) ── 阻塞等任意 fd 就绪 ──
├─ socket1 可读 → read(socket1) → 处理 → 回到 epoll_wait
└─ socket2 可读 → read(socket2) → 处理 → 回到 epoll_wait
→ N 个连接只需要少量线程,zero-cost 任务切换
epoll_wait 的阻塞正是 tokio 需要的——它不是在阻塞等一个特定的 socket,而是在阻塞等所有注册过的 fd 中任何一个就绪。一条线程的通话时间被分给成千上万个 fd 共享,没有空转、没有忙等。
但这里还有第二层:单个 socket 的 read() 本身也是非阻塞的。tokio 的 mio::net::TcpStream 在注册之前就已经被设为 O_NONBLOCK 了。所以 read() 永远不会真正卡住线程:
read(nonblocking_fd, buf) → 要么返回数据
→ 要么返回 EAGAIN / EWOULDBLOCK
→ 绝不阻塞等待
如果 read() 返回 WouldBlock,tokio 不会傻等——任务返回 Poll::Pending,线程立刻还给 epoll_wait 去服务其他 fd。等到下一次内核通知 “fd 可读” 时,epoll 会再次唤醒线程,再尝试读取。
sequenceDiagram
participant Task1 as 任务 A (fd1)
participant Task2 as 任务 B (fd2)
participant EPoll as epoll
participant Worker as Worker 线程
Worker->>EPoll: epoll_wait([fd1, fd2])
Note over Worker: 线程阻塞等待
EPoll-->>Worker: fd1 可读
Worker->>Task1: poll fd1
Task1->>Task1: read(fd1, buf) → 成功
Task1-->>Worker: Ready
Worker->>EPoll: 再次 epoll_wait
Note over Worker: 线程阻塞等待
EPoll-->>Worker: fd2 可读
Worker->>Task2: poll fd2
Task2->>Task2: read(fd2, buf) → WouldBlock! (假阳性)
Task2->>Task1: clear_readiness + Pending
Task2-->>Worker: Pending
Worker->>EPoll: 再次 epoll_wait
注意到上面 seq 图中 WouldBlock 场景:任务 B 被唤醒后 read() 返回 WouldBlock,它没有阻塞线程,而是返回 Pending 让线程继续回到 epoll_wait。
常问追问:epoll_wait 阻塞线程,那 .await 会 blocking 吗?
把这个问题拆开看,一个 .await 的完整生命周期是:
Future::poll→ 数据没到 → 注册 Waker → 返回Poll::Pending- 调度器把这个任务从运行队列移到等待队列
- 调度器拿出下一个就绪任务运行
- 如果没有任何任务就绪,线程才进入
epoll_wait
所以答案很清楚:.await 本身从不导致线程阻塞。.await 返回 Poll::Pending 之后就回到了调度器,线程继续处理其他就绪任务。只有所有任务都挂起后,线程才去 epoll_wait 等事件——这时的等待是多路复用的高效等待:一条线程替所有挂起的任务盯着所有 fd,任何一个 fd 就绪,线程就醒来。
反过来说,如果 .await 真的会 blocking,那下面的代码就不可能工作:
// 伪代码:两个 .await 背靠背
let data1 = socket1.read(&mut buf1).await?;
let data2 = socket2.read(&mut buf2).await?;
如果第一个 .await 阻塞线程,第二个 .await 永远没机会执行——但事实上它们都能正确执行。秘密就在于:第一个 .await 返回 Pending 后,线程回到调度器,调度器看到第二个任务就绪。即使第二个 .await 也返回 Pending,线程还可以去服务其他等待的 fd。阻塞在 epoll_wait 上的是线程,但任务早就从线程上被摘下来了。
所以完整回答是两层含义的叠加:
epoll_wait的阻塞是线程层次的阻塞——它是在「等任何一个 fd 就绪」,不是「等一个特定的 fd」- 单个 fd 的
read()永不阻塞线程——因为 fd 被设为O_NONBLOCK,数据没准备好就立即返回WouldBlock,不会卡住线程
这两层合在一起就是 tokio 异步 I/O 的引擎:一条线程开着 epoll_wait 同时等所有连接,每个连接上的 read/write 都不会卡住线程,遇到 WouldBlock 就把任务挂起,线程回去继续等。
这也顺便回答了上篇文章 sleep 和 I/O 的一个本质差异:sleep 的 epoll_wait(timeout) 在超时返回后,时间轮的”到期检测”是纯粹的 CPU 计算(比较 tick),不需要任何系统调用;而 I/O 的 epoll_wait(events) 返回后,任务还需要调用 read() 这个系统调用来真正获取数据。sleep 的用户态时间轮避免了额外的内核交互,而 I/O 的每一次数据读取都是一次系统调用。
4.3 一个值得思考的对比:io_uring 在哪?
到这里你可能还有一个问题:Linux 上有 io_uring,它可以完全异步地提交 I/O 操作并从 completion queue 中收割结果,tokio 为什么不用它来替代 epoll?
这是一个很好的问题。答案是:mio 是 readiness-based(就绪态)抽象,而 io_uring 是 completion-based(完成态)抽象,两者的编程模型从根本上不兼容。
Readiness-based vs Completion-based
Readiness-based 模型(epoll / kqueue)的工作方式是:
epoll_wait → 返回「fd X 可读」→ 用户调用 read(fd, buf) → 数据从内核拷贝到用户态
epoll 只告诉你”可以读了”,实际的读操作仍然由用户发起(read() 系统调用),且必须是同步的。这正是为什么上一节强调 O_NONBLOCK——因为 read() 虽然不阻塞线程,但它毕竟是同步完成的。
Completion-based 模型(io_uring)的工作方式是:
用户提交 read(fd, buf) 到 SQ(Submission Queue)
→ io_uring_enter 通知内核干活
→ 内核直接 DMA 把数据放入 buf
→ 完成后在 CQ(Completion Queue)放入一条完成记录
→ 用户从 CQ 收割结果
io_uring 把「等待就绪 + 读数据」合并成一步:你提交一个读请求,内核在数据到达时直接 DMA 到你的 buffer,然后通知你。你连 read() 系统调用都不需要再做了。
这两种模型在驱动层面的架构差异是根本性的:
graph LR
subgraph "Readiness-based (epoll / mio)"
A1["epoll_wait: 就绪通知"] --> A2["用户 read(fd, buf)"]
A2 --> A3["read 系统调用<br/>同步拷贝数据"]
end
subgraph "Completion-based (io_uring)"
B1["用户提交 read op 到 SQ"] --> B2["内核异步执行 I/O"]
B2 --> B3["CQ 完成通知"]
end
为什么 tokio 的默认 I/O 驱动用 mio(epoll)?
mio 是一个跨平台的 I/O 抽象层:Linux 上它用 epoll,macOS/FreeBSD 上用 kqueue,Windows 上用 IOCP。它定义的编程模型是 Source + Poll + Events——你把 fd 作为一个 Source 注册到 Poll 上,然后阻塞等待事件。
Tokio 的整个 I/O 驱动——ScheduledIo、Registration、PollEvented——全是围绕这个模型构建的:
ScheduledIo用原子位追踪 fd 的 readiness 状态Registration处理「等待就绪 → 尝试读写 → WouldBlock → 清除 readiness → 再等待」的循环PollEvented在 ET 模式下用 tick 保护 stale event
这套架构在 io_uring 面前完全不适用。io_uring 没有「fd readiness」的概念——它只有「这个 op 完成了」。你不能用 ScheduledIo 的 readiness 位来表达”异步读操作完成”。你需要一个完全不同的状态机来追踪每个 submitted op 的生命周期。
Tokio 实际是怎么支持 io_uring 的?
Tokio 确实支持 io_uring,但它被严格限定在文件系统操作上,且需要显式启用14:
// tokio/src/runtime/io/driver.rs: 56-70 (条件编译)
// 需要: tokio_unstable + feature = "io-uring" + feature = "fs" + Linux
pub(crate) uring_context: Mutex<UringContext>,
pub(crate) uring_probe: OnceCell<Option<io_uring::Probe>>,
启用后,tokio 采用混合架构:
- 网络 I/O(socket):走默认的 mio/epoll 路径——epoll + readiness 模型
- 文件 I/O:走 io_uring 路径——用户提交
read/write/open/stat等 ops 到 SQ,内核异步完成,结果在 CQ 中收割
这个混合体现在 turn() 方法里:处理完 epoll 事件后,立即 dispatch io_uring 的 completions14:
// tokio/src/runtime/io/driver.rs: 227-236
// 处理完 epoll 事件后,dispatch io_uring 的 completion
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux",
))]
{
let mut guard = handle.get_uring().lock();
let ctx = &mut *guard;
ctx.dispatch_completions();
}
io_uring 的 eventfd 甚至也是一个普通的 fd,注册到 epoll 上的14:
// tokio/src/runtime/io/driver/uring.rs: 180-184
impl Handle {
fn add_uring_source(&self, uringfd: RawFd) -> io::Result<()> {
let mut source = SourceFd(&uringfd);
self.registry
.register(&mut source, TOKEN_WAKEUP, Interest::READABLE.to_mio())
}
}
也就是说,io_uring 的 eventfd 被注册到 epoll 上,completion 到达时 epoll 通知驱动,然后驱动在 turn() 中 dispatch 这些 completion——io_uring 被挂在 epoll 的事件循环上。
从运行时的视角看,三种事件的驱动关系是:
epoll_wait 返回
├─ socket fd 就绪 → 设置 ScheduledIo readiness → 唤醒等待的任务
├─ io_uring eventfd 可读 → dispatch io_uring completions → 唤醒等待的任务
├─ TOKEN_WAKEUP → 外部 unpark
└─ TOKEN_SIGNAL → 信号处理
为什么不直接用 io_uring 替代 epoll 处理所有 socket I/O?
io_uring 的 socket 支持虽然已经存在,但用 io_uring 做网络 I/O 是否比 epoll 更好,直到今天(2026 年)仍然是一个开放问题15。原因有三:
-
延迟开销:io_uring 提交一个网络操作至少需要一次
io_uring_enter系统调用(或者通过 SQ 轮询来避免),而 epoll 模式下一次epoll_wait可以返回多个就绪的 fd,然后逐个read()。对于高吞吐但低延迟的网络场景,epoll 的批处理模式仍然有竞争力。 -
epoll 的”够用”:对于网络 I/O,
epoll_wait+ 非阻塞read()的模式已经足够好了。O_NONBLOCK确保了read()不会阻塞线程,epoll 的多路复用确保了单线程可以管理海量连接。io_uring 在网络上的优势主要在于减少系统调用次数和降低延迟,但在 epoll 已经做到几乎最优的场景下,替换的动力不足。 -
跨平台问题:io_uring 是 Linux-only(5.1+)。Tokio 的 runtime 需要跨平台一致性,把核心的网络 I/O 基础设施绑定在 io_uring 上会破坏跨平台的架构统一性。
所以 Tokio 选择了一条务实的路径:网络 I/O 用 epoll(mio),文件 I/O 用 io_uring(可选)。不是技术上的”不能”,而是架构上的”不值得为了替换而替换”。
五、读写的完整路径:从 poll 到 buffer
理解了数据结构后,来看 TcpStream::read(&mut buf).await 的完整路径。当一个 PollEvented 包装的 socket 被 poll_read 时10:
// tokio/src/io/poll_evented.rs: 169-194
pub(crate) unsafe fn poll_read<'a>(
&'a self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>
where &'a E: io::Read + 'a,
{
loop {
// 1. 等待读就绪
let evt = ready!(self.registration.poll_read_ready(cx))?;
// 2. 尝试读取
match self.io.as_ref().unwrap().read(b) {
Ok(n) => {
// 4. ET 优化:读了一部分但没读完,不清除 readiness
// (epoll 模式下可以继续读)
#[cfg(not(mio_unsupported_force_poll_poll))]
if 0 < n && n < len {
self.registration.clear_readiness(evt);
}
buf.advance(n);
return Poll::Ready(Ok(()));
}
// 3. 读阻塞了(false positive):清除 readiness 后重试
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
self.registration.clear_readiness(evt);
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}
这个模式比 Sleep::poll 复杂——因为 I/O 有假阳性(false positive)。epoll 返回 readable 后,fd 可能因为竞争等原因实际上不可读了。所以 I/O 的 poll 循环是:
poll_read_ready → 返回 Ready → 尝试 read
├─ 成功 → 返回数据
└─ WouldBlock → clear_readiness → 重新 poll
而 Sleep::poll 是:
检查时间轮 → 到期了吗?
├─ 是 → 返回 Ready
└─ 否 → 注册 waker → 返回 Pending
sleep 没有假阳性——deadline 到了就是到了。I/O 有假阳性——epoll 说 readable 了,但数据可能已经被别的线程读了,或者在监听 socket 上有新的 accept 提前消耗了事件。
Registration 还提供了一个更底层的 poll_io 方法——它把”等待就绪 → 尝试操作 → WouldBlock → 清理 → 重试”的循环封装在一个函数里11:
// tokio/src/runtime/io/registration.rs: 162-175
fn poll_io<R>(
&self,
cx: &mut Context<'_>,
direction: Direction,
mut f: impl FnMut() -> io::Result<R>,
) -> Poll<io::Result<R>> {
loop {
let ev = ready!(self.poll_ready(cx, direction))?;
match f() {
Ok(ret) => return Poll::Ready(Ok(ret)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}
六、你还在读同一条线程吗:关于「既处理 I/O 又处理 Timer」
一个常见的困惑是:Time Driver 和 I/O Driver 是在同一条线程上运行的吗?不冲突吗?
答案是是的,同一条线程,不冲突。原因可以用一句话概括:当线程在 epoll_wait 上阻塞时,时间驱动作了 I/O 驱动的一层薄薄的外包装,它做的所有事就是设置 epoll_wait 的 timeout 参数。
当 Time Driver 调用 park_timeout(duration) 时,它最终调用了 I/O Driver 的 park_timeout(duration)。I/O Driver 的 turn() 把 duration 传给 epoll_wait。这也就是 sleep 文章中提到的那条链路。
当 I/O 事件在 duration 到达之前触发,线程提前醒来,Time Driver 在 park_internal 中返回后立即调用 process() 收割到期定时器。I/O 事件的处理和时间到期检测在同一个函数调用栈中完成,没有竞争、没有额外的上下文切换。
graph TD
subgraph "Worker 线程主循环"
A["开始"] --> B["Time::park_internal()"]
B --> C["计算 next_wake"]
C --> D["IoStack::park_timeout(duration)"]
D --> E["epoll_wait(timeout)"]
E -->|"I/O 事件或超时"| F["Time::process_at_time()"]
F --> G["收割到期 timer"]
G --> H["Scheduler::run 处理就绪任务"]
H --> B
end
这种「一层包一层」的设计,使得 Time Driver 和 I/O Driver 完全共享同一个线程和同一次 epoll_wait 调用。没有线程切换、没有锁竞争(除了 ScheduledIo 内部的 Mutex)、没有额外的调度开销。
七、对比总览:时间驱动 vs 事件驱动
现在可以把两种表并列在一起看:
| 维度 | 时间驱动(sleep) | 事件驱动(I/O) |
|---|---|---|
| 核心数据结构 | 6 层哈希时间轮(Wheel) |
逐 fd 的原子状态机(ScheduledIo) |
| 状态管理 | StateCell(AtomicU64:到期时间 + 标记位) |
AtomicUsize(16b readiness + 15b tick + 1b shutdown) |
| 等待机制 | 比较 deadline vs now(用户态) | 等待 epoll 通知(内核态) |
| 唤醒方式 | fire() → Waker::wake() |
wake(ready) → Waker::wake() |
| 假阳性 | 无(时间到了就是到了) | 有(epoll 可能虚报) |
| 插入/注册 | Wheel::insert() → O(1) |
mio::Registry::register() → 系统调用 |
| 单次检查 | 检查当前 slot 内的所有 entry | 检查 epoll 返回的所有 event |
| 超时/时间驱动 | 核心机制:靠 deadline 驱动 | 仅用于设置 epoll_wait timeout |
| 内核交互 | 1 个定时器(hrtimer) | 所有 fd 注册到 epoll |
| 精确度 | 1ms | 纳秒级(取决于内核) |
核心差异的本质
两种驱动模型差异的根源在于同一个问题:「事件」发生的时机是否可控。
时间是可控的。给定一个 deadline,Tokio 可以在用户态用时间轮 O(1) 地判断是否到期。它不需要内核在”到期”这个事件发生时主动通知——它只需要知道当前时间,然后比较即可。内核定时器只是用来让线程不要一直空转。
网络 I/O是不可控的。内核之外的客户端可能在任意时刻发送数据包。Tokio 必须靠内核来通知——因为只有内核驱动(网卡中断 → 协议栈)知道数据什么时候到达。epoll 就是这个”内核通知机制”的接口。
这种「可控 vs 不可控」的区别,导致了两套完全不同的数据结构选择:
- 可控的时间 → 用户态哈希时间轮 → O(1) 插入,批量降级,线程只在必要时被唤醒
- 不可控的 I/O → epoll + per-fd 原子状态机 → 内核事件驱动,用户态只做快速的位操作和 waker 分发
graph LR
subgraph "可控事件"
A1["sleep(5s)"] --> A2["deadline = now + 5s"]
A2 --> A3["插入时间轮(O(1))"]
A3 --> A4["等待时:检查时间轮<br/>全部在用户态完成"]
A4 --> A5["到期:fire()"]
end
subgraph "不可控事件"
B1["TcpStream::read()"] --> B2["注册 fd 到 epoll"]
B2 --> B3["等待时:epoll_wait<br/>内核挂起线程"]
B3 --> B4["数据到达:epoll 通知"]
B4 --> B5["设置 readiness + wake()"]
end
八、另一种不可控事件:AsyncFd
tokio 的 I/O 驱动不只服务于网络 socket。AsyncFd 是一个通用的包装器,允许任何实现了 AsRawFd 的非阻塞文件描述符接入 Tokio 的 I/O 驱动16:
// tokio/src/io/async_fd.rs: 248-255
pub struct AsyncFd<T: AsRawFd> {
registration: Registration,
inner: Option<T>,
}
它的创建流程和 TcpStream 本质相同16:
// tokio/src/io/async_fd.rs: 406-414
pub(crate) fn try_new_with_handle_and_interest(
inner: T,
handle: scheduler::Handle,
interest: Interest,
) -> Result<Self, AsyncFdTryNewError<T>> {
let fd = inner.as_raw_fd();
match Registration::new_with_interest_and_handle(
&mut SourceFd(&fd), interest, handle,
) {
Ok(registration) => Ok(AsyncFd { registration, inner: Some(inner) }),
Err(cause) => Err(AsyncFdTryNewError { inner, cause }),
}
}
区别在于:TcpStream 使用 mio::net::TcpStream(它实现了 mio::event::Source),而 AsyncFd 使用 mio::unix::SourceFd——一个简单的包装器,不修改 fd 的配置(因此要求 fd 已经设置为非阻塞模式)。
AsyncFd 的使用模式体现了 ET 模型的完整语义16:
// 典型的 AsyncFd 使用模式
loop {
let mut guard = async_fd.readable().await?; // 等待可读
match guard.try_io(|inner| inner.get_ref().read(&mut buf)) {
Ok(result) => return result, // 成功读取
Err(_would_block) => continue, // 假阳性,重试
}
}
这里 guard.try_io() 的语义是:如果 I/O 操作返回 WouldBlock,自动调用 clear_ready() 清除 readiness 位。这确保了下次数据到达时,epoll 会发出新的边沿触发通知。
总结
Tokio 的 I/O 驱动和 Timer 驱动虽然在同一个 worker 线程中运行、共享同一次 epoll_wait 调用,但它们的核心机制截然不同:
Timer 驱动是一个用户态的时间调度器。它用哈希时间轮 O(1) 地管理成千上万个定时器,只向内核注册一个「最近 deadline」的唤醒点。线程被 epoll_wait 的超时机制叫醒,然后批量收割到期定时器。
I/O 驱动是一个内核态事件的分发器。它通过 epoll 把 fd 的 read/write 事件从内核调度到用户态,用 ScheduledIo 的原子状态机追踪每个 fd 的就绪状态,用 tick 机制解决边沿触发的 stale event 问题。
两者在 Tokio 中的分层设计是:
应用层:tokio::time::sleep tokio::net::TcpStream
| |
运行时层:TimerEntry / Wheel Registration / ScheduledIo
| |
驱动层: Time Driver I/O Driver (mio)
| |
内核层: hrtimer × 1 epoll (所有 fd)
驱动层通过 IoStack 堆叠在一起:外层 Timer 检查时间轮确定 timeout,内层 I/O 执行 epoll_wait。两条线在同一个线程上优雅地交织,互不干扰。
最终,这两种异步机制的实现都可以回溯到 Rust async 的核心契约1:返回 Poll::Pending 时注册 Waker,事件就绪时通过 Waker::wake() 通知调度器重新 poll。sleep 用「时间到达」作为就绪条件,TcpStream 用「IO 就绪」作为就绪条件——只是事件源不同,契约完全一样。
九、Feature 与 Builder 如何控制 Driver 启用
前面提到 time::Driver、process::Driver、signal::Driver、io::Driver 都可以通过条件编译或 Builder 选项跳过。这里把实际控制机制拆开看。
9.1 Cargo feature — 编译时是否包含
四个 feature 决定对应层的代码是否存在:
# Cargo.toml (tokio 的 default features)
tokio = { version = "1", features = ["rt", "net", "time", "process", "signal"] }
cfg_io_driver! 宏检查 feature net(编译时才展开):
// tokio/src/runtime/driver.rs: 134
cfg_io_driver! { // 仅在 feature = "net" 时编译
pub(crate) type IoDriver = crate::runtime::io::Driver;
}
cfg_not_io_driver! { // feature = "net" 关闭时
pub(crate) struct IoStack(ParkThread);
fn create_io_stack(..) -> ... {
let park_thread = ParkThread::new(); // 没有 io::Driver,纯 Condvar
(IoStack(park_thread), ..)
}
}
cfg_time! / cfg_not_time! 控制 TimeDriver enum 是否存在:
// tokio/src/runtime/driver.rs: 284
cfg_time! { // feature = "time" → TimeDriver 是 enum
pub(crate) enum TimeDriver {
Enabled { driver: crate::runtime::time::Driver },
EnabledAlt(IoStack),
Disabled(IoStack),
}
}
cfg_not_time! { // 没有 time feature → TimeDriver 是 IoStack 别名
type TimeDriver = IoStack;
}
同理 signal 和 process 的退化:
// tokio/src/runtime/driver.rs: 244
cfg_signal_internal_and_unix! { // feature = "signal" + Unix
type SignalDriver = crate::runtime::signal::Driver;
}
cfg_not_signal_internal! { // 无 signal 时退化为 IoDriver
type SignalDriver = IoDriver;
}
// tokio/src/runtime/driver.rs: 269
cfg_process_driver! { // feature = "process"
type ProcessDriver = crate::runtime::process::Driver;
}
cfg_not_process_driver! { // 无 process 时退化为 SignalDriver
type ProcessDriver = SignalDriver;
}
编译时未启用的 feature,相关代码在最终二进制中完全不存在。
9.2 Builder 选项 — 运行时启用或跳过
编译已包含的代码,可以通过 Builder 决定是否激活:
// tokio/src/runtime/builder.rs: 1672 / 1860
let (driver, driver_handle) = driver::Driver::new(cfg)?;
cfg 来自 builder 的设置17:
// tokio/src/runtime/builder.rs: 1668-1675
fn get_cfg(&self) -> driver::Cfg {
driver::Cfg {
enable_io: self.enable_io, // ← .enable_io()
enable_time: self.enable_time, // ← .enable_time()
nevents: self.nevents,
timer_flavor: self.timer_flavor,
}
}
driver::Driver::new() 在运行时判断这两个值:
// tokio/src/runtime/driver.rs: 47-58
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) =
create_io_stack(cfg.enable_io, cfg.nevents)?; // ← 是否创建 io::Driver
let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, .., io_stack, &clock); // ← 是否创建 time::Driver
Ok((Self { inner: time_driver }, ..))
}
create_io_stack 内部根据 enabled 选择真正的 I/O 层或 ParkThread:
// tokio/src/runtime/driver.rs: 149-170
fn create_io_stack(enabled: bool, nevents: usize) -> .. {
if enabled {
let (io_driver, io_handle) = io::Driver::new(nevents)?;
(IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), ..)
} else {
let park_thread = ParkThread::new();
(IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), ..)
}
}
9.3 两层控制的关系
| Cargo feature | Builder 选项 | 代码状态 | 运行时结果 |
|---|---|---|---|
net + .enable_io() |
— | 代码存在 | IoStack::Enabled |
net + 无 .enable_io() |
— | 代码存在 | IoStack::Disabled |
无 net |
— | 代码不存在 | 编译报错 |
time + .enable_time() |
— | 代码存在 | TimeDriver::Enabled |
time + 无 .enable_time() |
— | 代码存在 | TimeDriver::Disabled |
无 time |
— | 代码不存在 | TimeDriver = IoStack |
rt feature 也必须启用(默认开启)。最终最简的 runtime 只包含 runtime::driver::Driver + IoStack::Disabled(ParkThread),可以用 tokio::spawn 做纯计算任务调度,完全不需要任何 I/O 或 Timer 能力。
References
-
上篇文章:《从 tokio::time::sleep 看异步 Timer 的实现:一次从 Future::poll 到哈希时间轮的源码之旅》,2026-05-03。详细分析了
tokio::time::sleep从Future::poll、TimerEntry、StateCell到Wheel哈希时间轮的完整链路。以及更早的文章:《Rust async/await 的底层契约:从 Future::poll 到 Tokio 运行时》,2026-04-30,阐述了Future::poll、Context、Waker的核心协议。 ↩ ↩2 -
Tokio 源码,Driver 分层结构,
tokio/src/runtime/driver.rs。定义了Driver(TimeDriver包装IoStack)、Handle(IoHandle+SignalHandle+TimeHandle)、以及create_io_stack/create_time_driver的创建逻辑。 ↩ ↩2 ↩3 -
mio 源码,
cfg编译时选择操作系统后端,mio/src/sys/mod.rs。cfg_os_poll!宏控制模块编译:unix 下进入unix/子模块(进一步选择 epoll / kqueue / poll),windows 下进入windows/模块(IOCP),wasi 下进入wasip1/模块。 ↩ -
mio 源码,epoll selector 实现,
mio/src/sys/unix/selector/epoll.rs。Selector::new()通过epoll_create1创建 epoll fd,Selector::select()通过epoll_wait获取就绪事件,Selector::register()通过epoll_ctl(EPOLL_CTL_ADD)注册 fd。默认使用EPOLLET边沿触发模式。 ↩ -
mio 源码,kqueue selector 实现,
mio/src/sys/unix/selector/kqueue.rs。Selector::new()通过kqueue()创建 kqueue fd,Selector::select()通过kevent获取事件,Selector::register()使用EVFILT_READ/EVFILT_WRITE+EV_CLEAR边沿触发标志。 ↩ -
mio Cargo.toml feature 配置,
mio/Cargo.toml。os-pollfeature 启用cfg_os_poll!条件编译;os-extfeature 启用cfg_any_os_ext!额外扩展接口。 ↩ -
Tokio 源码,
runtime::io::Driver结构定义和turn/park_timeout/add_source方法,tokio/src/runtime/io/driver.rs。Driver持有poll: mio::Poll;turn()方法调用self.poll.poll(events, max_wait)执行epoll_wait;add_source()通过self.registry.register(source, token, interest)执行epoll_ctl(ADD)。注册和等待在同一个 mio::Poll 实例上完成。 ↩ ↩2 ↩3 -
Tokio 源码,多线程 Parker 的
TryLock<Driver>共享机制,tokio/src/runtime/scheduler/multi_thread/park.rs。Shared::driver是TryLock<Driver>,一次只有一个线程可以拿到锁执行epoll_wait,其他线程退化为 condvar 等待。 ↩ -
Tokio 源码,
TcpStream结构定义,tokio/src/net/tcp/stream.rs。TcpStream只是PollEvented<mio::net::TcpStream>的一层包装,构建时调用PollEvented::new(connected)。 ↩ ↩2 -
Tokio 源码,
PollEvented结构定义与poll_read方法,tokio/src/io/poll_evented.rs。PollEvented是通用 I/O 包装器,其poll_read方法展示了 ET 模式下的 read 循环(WouldBlock→clear_readiness→ 重试)以及针对 edge-triggered selector 的read部分结果优化。 ↩ ↩2 ↩3 -
Tokio 源码,
Registration结构定义与new_with_interest_and_handle、poll_io等方法,tokio/src/runtime/io/registration.rs。Registration是 I/O 资源与 reactor 的桥梁,封装了注册、就绪轮询和 WouldBlock 循环。 ↩ ↩2 ↩3 -
Tokio 源码,I/O Driver 的
Driver::new、Handle::add_source、Driver::turn,tokio/src/runtime/io/driver.rs。add_source将mio::Source注册到 epoll,token 为ScheduledIo的指针地址;turn()阻塞在epoll_wait上并分发事件。 ↩ ↩2 ↩3 -
Tokio 源码,
ScheduledIo结构定义、set_readiness、wake方法,tokio/src/runtime/io/scheduled_io.rs。核心结构:readiness为打包的 AtomicUsize(16b readiness + 15b tick + 1b shutdown),waiters为Mutex<Waiters>管理专用槽和链表等待者。set_readiness使用fetch_updateCAS 实现 tick 保护,wake使用WakeList批量唤醒。 ↩ ↩2 ↩3 ↩4 ↩5 -
Tokio 源码,io_uring 集成代码,
tokio/src/runtime/io/driver/uring.rs。UringContext管理 io_uring 实例和 op 生命周期,dispatch_completions()在turn()方法中被调用,io_uring 的 eventfd 通过SourceFd注册到 epoll 上。 ↩ ↩2 ↩3 -
《io_uring and networking: 2024》,LWN.net,https://lwn.net/Articles/961273/。讨论了 io_uring 在网络 I/O 场景下的现状,包括性能对比和与 epoll 的取舍。 ↩
-
Tokio 源码,
AsyncFd结构定义与使用模式,tokio/src/io/async_fd.rs。通用的 fd → async 包装器,通过mio::unix::SourceFd接入 I/O 驱动,提供readable()/writable()/async_io()等高级 API 和poll_read_ready()/try_io()等底层 API。 ↩ ↩2 ↩3 -
Tokio 源码,
Builder::get_cfg和Cfg定义,tokio/src/runtime/builder.rs。Cfg三个关键字段:enable_io(来自.enable_io())、enable_time(来自.enable_time())、nevents(来自.max_io_events_per_tick())。 ↩