从 tokio::time::sleep 到 tokio::net::TcpStream:时间驱动与事件驱动的两种异步实现
上篇文章跟踪了
tokio::time::sleep从 Future::poll 到哈希时间轮的完整链路。那篇文章的核心是一个问题:时间怎么做到异步等待的?本文要回答另一个问题:网络 I/O又是怎么做到异步等待的?它们共享同一个 runtime 主循环,但背后是两种完全不同的驱动模型。
引言:为什么需要第二篇?
上篇文章1中,我们看到了 tokio::time::sleep 的实现链条:
Future::poll → TimerEntry → Wheel(哈希时间轮) → Driver::park_timeout → epoll_wait
那条链路的核心架构是:用户态维护时间轮,只向内核注册一个「最早 deadline」的定时器。当线程从 epoll_wait 返回后,驱动去时间轮里收割所有到期 timer,批量唤醒对应的任务。
但这里有一个容易被忽略的细节:park_timeout 最终调用的 epoll_wait,本身就是 Linux 上最核心的 I/O 事件通知机制。时间驱动把 I/O 驱动的 park_timeout 当成了「闹钟」来用——它只是顺路借用了 epoll 的超时功能。真正的主角是I/O 驱动自己。
当你写下下面这行代码时,背后发生的事情和 sleep 有着本质区别:
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
stream.read(&mut buf).await?;
sleep 只关心”时间到了没”;TcpStream::read 关心的是”数据到了没”。前者在用户态就能判断(查时间轮),后者必须由内核通知——因为只有内核知道网卡上有没有新数据包到达。
这篇文章就从 TcpStream 注册到 I/O 驱动的完整流程讲起,拆开 ScheduledIo、Registration、PollEvented、I/O Driver 的事件循环,然后和上篇的 Timer 驱动做横向对比,看看它们在同一套 runtime 主循环里是如何共存并各自工作的。
与上一篇的关系
| 上一篇 | 本文 |
|---|---|
tokio::time::sleep |
tokio::net::TcpStream |
| 时间驱动(Timer Driver) | I/O 驱动(I/O Driver) |
| 哈希时间轮(用户态) | epoll(内核态) |
| 一个内核定时器管理所有 sleep | 所有 fd 注册到 epoll |
| 「时间到了」由内核定时器通知 | 「数据到了」由 epoll 事件通知 |
| 主动检查(poll time wheel) | 被动等待(wait on epoll) |
概念先行:时间驱动 vs 事件驱动
tokio::time::sleep 和 TcpStream::read 虽然都返回 Future,都可以 .await,但它们的驱动模型有本质区别。
时间驱动的 sleep 是「到时即醒」:线程把 park timeout 设到最早的 deadline,内核在 deadline 到达时通过时钟中断唤醒线程。判断”是否到期”完全可以在用户态通过比较 deadline 和当前时间完成,不需要内核在事件发生时通知。
事件驱动的 I/O 是「有数即醒」:线程阻塞在 epoll_wait 上,内核在数据到达网卡时通过硬件中断 → 驱动 → epoll 事件通知唤醒线程。判断”是否可读”只有内核知道——因为数据包什么时候到达是不可预测的。
用一句话概括核心区别:
sleep 的等待是确定性的——我知道 5 秒后肯定到;I/O 的等待是不确定性的——我不知道数据什么时候来。
这对数据结构选择产生了深远的影响:
graph LR
subgraph "时间驱动(sleep)"
A1["用户态时间轮<br/>O(1) 插入/Wakeup"] -->|"最近 deadline"| B1["内核 hrtimer × 1"]
B1 -->|"定时中断"| C1["线程醒来"]
C1 -->|"批量收割"| A1
end
subgraph "事件驱动(I/O)"
A2["ScheduledIo<br/>原子 readiness 位"] -->|"注册到"| B2["epoll<br/>内核管理所有 fd"]
B2 -->|"可读/可写事件"| C2["线程醒来"]
C2 -->|"更新 readiness"| A2
end
有了这个直觉,接下来就可以拆开源码了。
一、Driver 堆栈:TimeDriver 在 IODriver 之上
在深入 TcpStream 之前,需要先理解 Tokio 运行时的 Driver 堆栈结构。
Tokio 运行时的驱动是分层的——内层是 I/O 驱动(epoll),外层是时间驱动2:
// tokio/src/runtime/driver.rs: 44-50
pub(crate) struct Driver {
inner: TimeDriver,
}
pub(crate) struct Handle {
pub(crate) io: IoHandle, // I/O 驱动句柄
pub(crate) signal: SignalHandle, // 信号驱动句柄
pub(crate) time: TimeHandle, // 时间驱动句柄
pub(crate) clock: Clock,
}
创建时,先创建 I/O 驱动,然后在其上包裹时间驱动2:
// tokio/src/runtime/driver.rs: 108-117
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, cfg.timer_flavor, io_stack, &clock);
// ...
}
IoStack 本身又是一个小堆栈2:
IoStack (if io enabled):
ProcessDriver (子进程事件)
└── SignalDriver (Unix 信号)
└── IoDriver (mio::Poll)
当时间驱动需要 park 时,它把 park_timeout 委托给 IoStack,后者最终调用 mio::Poll::poll:
// tokio/src/runtime/time/mod.rs: 255-260
fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
// ...
match next_wake {
Some(when) => {
// 有定时器在将来到期:带超时 park
self.park_thread_timeout(rt_handle, duration);
}
None => {
// 没有定时器:无限期 park(除非有 limit)
if let Some(duration) = limit {
self.park_thread_timeout(rt_handle, duration);
} else {
self.park.park(rt_handle); // 无限期等待 I/O 事件
}
}
}
}
换句话说,Tokio 的 worker 线程只有一个统一的 park 循环:时间驱动检查时间轮中的最近 deadline,如果时间轮不为空,就用 epoll_wait(timeout) 带超时地等待 I/O 事件;如果时间轮为空,就无限期等待 I/O 事件——I/O 事件或者 mio::Waker 的 unpark 会把线程叫醒。
这个设计的精妙之处在于:时间驱动和 I/O 驱动共享一次 epoll 系统调用。如果时间轮里有一个 5 秒后到期的 sleep,同时一个 TCP socket 在 3 秒时收到数据——线程会在 3 秒时被 epoll 叫醒,然后时间驱动顺便检查时间轮、收割到期定时器。不需要两套独立的等待机制。
sequenceDiagram
participant TDriver as Time Driver
participant IDriver as I/O Driver (epoll)
participant Kernel as Linux Kernel
participant Clock as 时钟中断
TDriver->>IDriver: park_timeout(5000ms)
IDriver->>Kernel: epoll_wait(timeout=5000ms)
Note over Kernel: 等待 I/O 事件或超时
Kernel-->>IDriver: 数据到达 → epoll 返回
IDriver-->>TDriver: 醒来
TDriver->>TDriver: process_at_time() → 收割到期定时器
TDriver->>IDriver: 处理 I/O 事件
但这里的细节是:上述 seq 图是简化版。实际的 I/O Driver 的 turn() 首先被调用,它负责处理 epoll 事件并设置 ScheduledIo 的 readiness;然后才轮到 Time Driver 处理定时器。两者的协作是通过 driver::Handle 来协调的。
二、TcpStream 的创建:一条完整的注册链
现在来看 TcpStream::connect("127.0.0.1:8080") 背后到底发生了什么。
2.1 从 TcpStream 到 PollEvented
TcpStream 的结构非常简单——它只是一个 PollEvented 的包装3:
// tokio/src/net/tcp/stream.rs: 72-74
pub struct TcpStream {
io: PollEvented<mio::net::TcpStream>,
}
PollEvented 是 Tokio 的一个通用包装器:它接收一个实现了 mio::event::Source 的类型(如 mio::net::TcpStream),将其注册到运行时的 I/O 驱动上4:
// tokio/src/io/poll_evented.rs: 89-94
pub(crate) struct PollEvented<E: Source> {
io: Option<E>,
registration: Registration,
}
创建 PollEvented 时,需要拿到当前 runtime 的调度器句柄(scheduler::Handle),然后用它来完成注册4:
// tokio/src/io/poll_evented.rs: 130-139
pub(crate) fn new_with_interest_and_handle(
mut io: E,
interest: Interest,
handle: scheduler::Handle,
) -> io::Result<Self> {
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
Ok(Self { io: Some(io), registration })
}
TcpStream::new 只做了一件事:把 mio::net::TcpStream 交给 PollEvented::new3:
// tokio/src/net/tcp/stream.rs: 160-163
pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
let io = PollEvented::new(connected)?;
Ok(TcpStream { io })
}
2.2 从 PollEvented 到 Registration
Registration 是更底层的抽象。它持有一个 Arc<ScheduledIo>,后者是 I/O 驱动中真正管理状态的结构5:
// tokio/src/runtime/io/registration.rs: 64-72
pub(crate) struct Registration {
handle: scheduler::Handle,
shared: Arc<ScheduledIo>,
}
Registration::new_with_interest_and_handle 是整条注册链的关键入口5:
// tokio/src/runtime/io/registration.rs: 91-98
pub(crate) fn new_with_interest_and_handle(
io: &mut impl Source,
interest: Interest,
handle: scheduler::Handle,
) -> io::Result<Registration> {
let shared = handle.driver().io().add_source(io, interest)?;
Ok(Registration { handle, shared })
}
这里 handle.driver().io() 拿到 I/O Driver 的 Handle,然后调用 add_source。
2.3 从 Registration 到 I/O Driver
Handle::add_source 是真正把 fd 注册到 epoll 的地方6:
// tokio/src/runtime/io/driver.rs: 182-207
pub(super) fn add_source(
&self,
source: &mut impl mio::event::Source,
interest: Interest,
) -> io::Result<Arc<ScheduledIo>> {
// 1. 从 RegistrationSet 中分配一个新的 ScheduledIo
let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
let token = scheduled_io.token();
// 2. 通过 mio::Registry 将 source 注册到 epoll
// token 是 ScheduledIo 对象的指针值(作为 epoll 的标识)
if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
// 注册失败:从 set 中移除分配的 ScheduledIo
unsafe { self.registrations.remove(&mut self.synced.lock(), &scheduled_io) };
return Err(e);
}
self.metrics.incr_fd_count();
Ok(scheduled_io)
}
这里有一个关键设计:token 是 ScheduledIo 对象的地址。mio::Token 是一个 usize,而 ScheduledIo 是一个 Arc 管理的堆对象。add_source 使用 PtrExposeDomain 把 ScheduledIo 的地址暴露为 mio::Token,这样 epoll 返回事件时,Tokio 可以直接通过 token 反向找到对应的 ScheduledIo6:
// tokio/src/runtime/io/scheduled_io.rs: 268-270
pub(crate) fn token(&self) -> mio::Token {
mio::Token(super::EXPOSE_IO.expose_provenance(self))
}
整条注册链可以用下图来总结:
graph TD
A["TcpStream::connect(addr)"] --> B["mio::net::TcpStream::connect(addr)"]
B --> C["TcpStream::new(sys)"]
C --> D["PollEvented::new(connected)"]
D --> E["Registration::new_with_interest_and_handle"]
E --> F["Handle::add_source"]
F --> G["RegistrationSet::allocate → Arc<ScheduledIo>"]
F --> H["mio::Registry::register(source, token, interest)"]
H --> I["epoll_ctl(EPOLL_CTL_ADD, fd, ...)"]
style G fill:#e1f5fe
style H fill:#e1f5fe
style I fill:#c8e6c9
注册完成后,TcpStream 的 PollEvented 内部持有一个 Registration,Registration 内部持有一个 Arc<ScheduledIo>,而 ScheduledIo 的地址就是它在 epoll 中的 token。通过这个 token,epoll 事件可以反向定位到 ScheduledIo,运行时可快速处理就绪事件。
三、ScheduledIo:边沿触发的原子状态机
ScheduledIo 是 I/O 驱动的核心数据结构。如果说 Timer 驱动的核心是哈希时间轮和 TimerShared,那 I/O 驱动的核心就是 ScheduledIo 和它的原子状态7:
// tokio/src/runtime/io/scheduled_io.rs: 59-61
pub(crate) struct ScheduledIo {
linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
readiness: AtomicUsize, // 打包的 readiness 状态
waiters: Mutex<Waiters>, // 等待队列
}
3.1 打包的原子状态
readiness 字段是一个 AtomicUsize,但它承载了三个独立的信息,通过位打包(bit packing)放在一个原子变量里7:
// tokio/src/runtime/io/scheduled_io.rs: 224-230
// | shutdown | driver tick | readiness |
// |----------+-------------+-----------|
// | 1 bit | 15 bits | 16 bits |
const READINESS: bit::Pack = bit::Pack::least_significant(16);
const TICK: bit::Pack = READINESS.then(15);
const SHUTDOWN: bit::Pack = TICK.then(1);
- readiness(16 位):当前就绪状态的位掩码(可读、可写、错误、挂起等)
- tick(15 位):I/O 驱动的 tick 计数器,每次
set_readiness递增,用于防止清理旧的就绪事件 - shutdown(1 位):I/O 驱动是否已关闭
三个状态打包在一个 AtomicUsize 中,意味着对 readiness 的更新和检查都是原子操作,无需外部锁。
3.2 和 TimerShared 的对比
上篇文章介绍了 TimerShared 及其 StateCell(用 AtomicU64 实现的原子状态机)。两者都是原子状态机,但解决的问题不同:
| 方面 | TimerShared (sleep) |
ScheduledIo (I/O) |
|---|---|---|
| 核心状态 | 到期时间、是否已触发 | readiness、tick |
| 驱动方式 | 时间轮到期触发 | epoll 事件通知 |
| 更新方式 | CAS 循环(set_expiration/mark_pending) |
fetch_update(set_readiness) |
| 通知机制 | fire() → Waker::wake() |
wake(ready) → Waker::wake() |
| 并发角色 | 用户端(TimerEntry)+ 驱动端(Driver) |
用户端(Registration)+ 驱动端(Driver) |
| 数据结构 | 侵入式链表节点(用于时间轮 slot) | 侵入式链表节点(用于 RegistrationSet) |
3.3 边沿触发(ET)模型:为什么需要 tick
mio 在 Linux 上默认使用 epoll 的边沿触发(Edge-Triggered, ET)模式。ET 模式的特点是:只在状态从「不可用」变为「可用」时通知一次。这意味着:
- epoll 通知 readable →
ScheduledIo设置 READABLE 位 - 用户尝试
read()→ 成功读到部分数据 - epoll 不会再次通知(因为状态没有从不可读变为可读)
- 用户可以继续
read()直到返回WouldBlock→ 清除 READABLE 位 → 下次 epoll 通知再来
但这里有一个微妙的问题:如何防止清理旧事件时误清新事件? 这就是 tick 的作用。
sequenceDiagram
participant EPoll as epoll
participant SI as ScheduledIo
participant Task as User Task
EPoll->>SI: poll 返回 readable(tick=1)
SI->>SI: set_readiness: 设置 READABLE + tick=2
SI-->>Task: wake()
Task->>SI: poll_read_ready()
SI-->>Task: READABLE(tick=2)
Task->>Task: read() → 成功
Note over Task: 还没读完...
EPoll->>SI: 新数据到达,再次通知(tick=3)
SI->>SI: set_readiness: tick=3
Task->>SI: clear_readiness(tick=2) ← 旧 tick
SI->>SI: tick 不匹配 → 无操作
Task->>SI: poll_read_ready()
SI-->>Task: READABLE(tick=3 时设置)
Task->>Task: 继续读取
关键机制在 set_readiness 中7:
// tokio/src/runtime/io/scheduled_io.rs: 277-298
pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) {
let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| {
const MAX_TICK: usize = TICK.max_value() + 1;
let tick = TICK.unpack(curr);
let new_tick = match tick_op {
// 清理 readiness 时:如果 tick 不匹配,跳过此次操作
Tick::Clear(t) if tick as u8 != t => return None,
Tick::Clear(t) => t as usize,
// 设置 readiness 时:递增 tick
Tick::Set => tick.wrapping_add(1) % MAX_TICK,
};
let ready = Ready::from_usize(READINESS.unpack(curr));
Some(TICK.pack(new_tick, f(ready).as_usize()))
});
}
fetch_update 是一个乐观 CAS 循环:它尝试读取当前值、调用闭包生成新值、然后原子地替换。如果闭包返回 None(例如清理时的 tick 不匹配),则不进行任何更新。
这正是 ET 模型所需要的:每次 epoll 通知都会递增 tick,旧的事件清不到新 tick 的脏位。如果没有这个 tick 机制,一个慢任务可能在清理旧事件时无意中清掉了新到达的事件,导致数据静默丢失。
3.4 Waiters:等待队列
ScheduledIo 的第三个字段 waiters 管理了等待此 I/O 资源的任务7:
// tokio/src/runtime/io/scheduled_io.rs: 70-78
#[derive(Debug, Default)]
struct Waiters {
/// 所有等待者的侵入式链表
list: WaitList,
/// AsyncRead 专用 waker 槽
reader: Option<Waker>,
/// AsyncWrite 专用 waker 槽
writer: Option<Waker>,
}
这里有两个并行的机制:
- 专用 waker 槽(
reader/writer):用于poll_read_ready/poll_write_ready快速路径——每个方向只有一个任务在等待,直接用Option<Waker>存,无需链表 - 通用等待链表:用于
readiness()async 方法——支持多个任务同时等待同一个ScheduledIo的不同 Interest
当 I/O 事件到达时,wake() 方法遍历这两者并唤醒对应的任务7:
// tokio/src/runtime/io/scheduled_io.rs: 304-348
pub(super) fn wake(&self, ready: Ready) {
let mut wakers = WakeList::new();
let mut waiters = self.waiters.lock();
// 检查 read 槽
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
wakers.push(waker);
}
}
// 检查 write 槽
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
wakers.push(waker);
}
}
// 遍历链表中的等待者
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
while wakers.can_push() {
match iter.next() {
Some(waiter) => {
// ... 收集 waker,标记已就绪
}
None => break 'outer,
}
}
// 批次满了:释放锁后批量唤醒
drop(waiters);
wakers.wake_all();
waiters = self.waiters.lock();
}
// ...
}
wake_all 在释放锁之后进行,避免持锁时调用外部代码导致死锁——这和上篇文章中 Time Driver 的 waker_list.wake_all() 是相同的模式。
四、事件循环:turn()
当一切就绪后,是谁来驱动 I/O 事件的处理的?答案是 I/O Driver 的 turn() 方法6:
// tokio/src/runtime/io/driver.rs: 122-174
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
// 处理待释放的注册(被 drop 的 Registration)
handle.release_pending_registrations();
let events = &mut self.events;
// 核心:阻塞在 epoll_wait 上
match self.poll.poll(events, max_wait) {
Ok(()) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
// ...
}
// 处理所有返回的事件
let mut ready_count = 0;
for event in events.iter() {
let token = event.token();
if token == TOKEN_WAKEUP {
// mio::Waker 的 unpark,无需处理
} else if token == TOKEN_SIGNAL {
self.signal_ready = true;
} else {
let ready = Ready::from_mio(event);
// token 就是 ScheduledIo 的地址
let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
let io: &ScheduledIo = unsafe { &*ptr };
// 更新 readiness 状态并唤醒等待任务
io.set_readiness(Tick::Set, |curr| curr | ready);
io.wake(ready);
ready_count += 1;
}
}
// ...
}
turn() 的工作流程清晰明了:
- 释放待处理的注册:清理之前被 drop 但尚未从 epoll 中移除的
Registration - 阻塞等待:
self.poll.poll(events, max_wait)—— 对,这就是epoll_wait - 分发事件:遍历 epoll 返回的每个事件,通过 token 找到对应的
ScheduledIo,设置 readiness 位,唤醒等待的任务
当 I/O 驱动独立运行时(如 Time Driver 没有启用),max_wait 为 None 表示无限期等待;当 Time Driver 有定时器时,max_wait 为”到最早 deadline 的时间差”。
整体驱动循环
将 I/O 驱动和时间驱动放在一起看,Tokio 的 worker 线程主循环大致是:
sequenceDiagram
participant Worker as Worker Thread
participant Time as Time Driver
participant IO as I/O Driver
participant Sched as Scheduler
loop 主循环
Worker->>Time: park_internal()
Time->>Time: 检查时间轮 → 下一个 deadline
Time->>IO: park/park_timeout(duration)
IO->>IO: epoll_wait(events, duration)
Note over IO: 被 I/O 事件或超时唤醒
IO-->>Time: 返回
Time->>Time: process_at_time(now)
Time->>Time: 收割到期 timer
Time-->>Worker: 线程继续
Worker->>Sched: 运行就绪任务
end
这也就是在上一篇文章中看到的 park_internal 流程——只是现在我们把 park 的底层展开了,看到了 I/O 驱动这一层。
五、读写的完整路径:从 poll 到 buffer
理解了数据结构后,来看 TcpStream::read(&mut buf).await 的完整路径。当一个 PollEvented 包装的 socket 被 poll_read 时4:
// tokio/src/io/poll_evented.rs: 169-194
pub(crate) unsafe fn poll_read<'a>(
&'a self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>
where &'a E: io::Read + 'a,
{
loop {
// 1. 等待读就绪
let evt = ready!(self.registration.poll_read_ready(cx))?;
// 2. 尝试读取
match self.io.as_ref().unwrap().read(b) {
Ok(n) => {
// 4. ET 优化:读了一部分但没读完,不清除 readiness
// (epoll 模式下可以继续读)
#[cfg(not(mio_unsupported_force_poll_poll))]
if 0 < n && n < len {
self.registration.clear_readiness(evt);
}
buf.advance(n);
return Poll::Ready(Ok(()));
}
// 3. 读阻塞了(false positive):清除 readiness 后重试
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
self.registration.clear_readiness(evt);
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}
这个模式比 Sleep::poll 复杂——因为 I/O 有假阳性(false positive)。epoll 返回 readable 后,fd 可能因为竞争等原因实际上不可读了。所以 I/O 的 poll 循环是:
poll_read_ready → 返回 Ready → 尝试 read
├─ 成功 → 返回数据
└─ WouldBlock → clear_readiness → 重新 poll
而 Sleep::poll 是:
检查时间轮 → 到期了吗?
├─ 是 → 返回 Ready
└─ 否 → 注册 waker → 返回 Pending
sleep 没有假阳性——deadline 到了就是到了。I/O 有假阳性——epoll 说 readable 了,但数据可能已经被别的线程读了,或者在监听 socket 上有新的 accept 提前消耗了事件。
Registration 还提供了一个更底层的 poll_io 方法——它把”等待就绪 → 尝试操作 → WouldBlock → 清理 → 重试”的循环封装在一个函数里5:
// tokio/src/runtime/io/registration.rs: 162-175
fn poll_io<R>(
&self,
cx: &mut Context<'_>,
direction: Direction,
mut f: impl FnMut() -> io::Result<R>,
) -> Poll<io::Result<R>> {
loop {
let ev = ready!(self.poll_ready(cx, direction))?;
match f() {
Ok(ret) => return Poll::Ready(Ok(ret)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}
六、你还在读同一条线程吗:关于「既处理 I/O 又处理 Timer」
一个常见的困惑是:Time Driver 和 I/O Driver 是在同一条线程上运行的吗?不冲突吗?
答案是是的,同一条线程,不冲突。原因可以用一句话概括:当线程在 epoll_wait 上阻塞时,时间驱动作了 I/O 驱动的一层薄薄的外包装,它做的所有事就是设置 epoll_wait 的 timeout 参数。
当 Time Driver 调用 park_timeout(duration) 时,它最终调用了 I/O Driver 的 park_timeout(duration)。I/O Driver 的 turn() 把 duration 传给 epoll_wait。这也就是 sleep 文章中提到的那条链路。
当 I/O 事件在 duration 到达之前触发,线程提前醒来,Time Driver 在 park_internal 中返回后立即调用 process() 收割到期定时器。I/O 事件的处理和时间到期检测在同一个函数调用栈中完成,没有竞争、没有额外的上下文切换。
graph TD
subgraph "Worker 线程主循环"
A["开始"] --> B["Time::park_internal()"]
B --> C["计算 next_wake"]
C --> D["IoStack::park_timeout(duration)"]
D --> E["epoll_wait(timeout)"]
E -->|"I/O 事件或超时"| F["Time::process_at_time()"]
F --> G["收割到期 timer"]
G --> H["Scheduler::run 处理就绪任务"]
H --> B
end
这种「一层包一层」的设计,使得 Time Driver 和 I/O Driver 完全共享同一个线程和同一次 epoll_wait 调用。没有线程切换、没有锁竞争(除了 ScheduledIo 内部的 Mutex)、没有额外的调度开销。
七、对比总览:时间驱动 vs 事件驱动
现在可以把两种表并列在一起看:
| 维度 | 时间驱动(sleep) | 事件驱动(I/O) |
|---|---|---|
| 核心数据结构 | 6 层哈希时间轮(Wheel) |
逐 fd 的原子状态机(ScheduledIo) |
| 状态管理 | StateCell(AtomicU64:到期时间 + 标记位) |
AtomicUsize(16b readiness + 15b tick + 1b shutdown) |
| 等待机制 | 比较 deadline vs now(用户态) | 等待 epoll 通知(内核态) |
| 唤醒方式 | fire() → Waker::wake() |
wake(ready) → Waker::wake() |
| 假阳性 | 无(时间到了就是到了) | 有(epoll 可能虚报) |
| 插入/注册 | Wheel::insert() → O(1) |
mio::Registry::register() → 系统调用 |
| 单次检查 | 检查当前 slot 内的所有 entry | 检查 epoll 返回的所有 event |
| 超时/时间驱动 | 核心机制:靠 deadline 驱动 | 仅用于设置 epoll_wait timeout |
| 内核交互 | 1 个定时器(hrtimer) | 所有 fd 注册到 epoll |
| 精确度 | 1ms | 纳秒级(取决于内核) |
核心差异的本质
两种驱动模型差异的根源在于同一个问题:「事件」发生的时机是否可控。
时间是可控的。给定一个 deadline,Tokio 可以在用户态用时间轮 O(1) 地判断是否到期。它不需要内核在”到期”这个事件发生时主动通知——它只需要知道当前时间,然后比较即可。内核定时器只是用来让线程不要一直空转。
网络 I/O是不可控的。内核之外的客户端可能在任意时刻发送数据包。Tokio 必须靠内核来通知——因为只有内核驱动(网卡中断 → 协议栈)知道数据什么时候到达。epoll 就是这个”内核通知机制”的接口。
这种「可控 vs 不可控」的区别,导致了两套完全不同的数据结构选择:
- 可控的时间 → 用户态哈希时间轮 → O(1) 插入,批量降级,线程只在必要时被唤醒
- 不可控的 I/O → epoll + per-fd 原子状态机 → 内核事件驱动,用户态只做快速的位操作和 waker 分发
graph LR
subgraph "可控事件"
A1["sleep(5s)"] --> A2["deadline = now + 5s"]
A2 --> A3["插入时间轮(O(1))"]
A3 --> A4["等待时:检查时间轮<br/>全部在用户态完成"]
A4 --> A5["到期:fire()"]
end
subgraph "不可控事件"
B1["TcpStream::read()"] --> B2["注册 fd 到 epoll"]
B2 --> B3["等待时:epoll_wait<br/>内核挂起线程"]
B3 --> B4["数据到达:epoll 通知"]
B4 --> B5["设置 readiness + wake()"]
end
八、另一种不可控事件:AsyncFd
tokio 的 I/O 驱动不只服务于网络 socket。AsyncFd 是一个通用的包装器,允许任何实现了 AsRawFd 的非阻塞文件描述符接入 Tokio 的 I/O 驱动8:
// tokio/src/io/async_fd.rs: 248-255
pub struct AsyncFd<T: AsRawFd> {
registration: Registration,
inner: Option<T>,
}
它的创建流程和 TcpStream 本质相同8:
// tokio/src/io/async_fd.rs: 406-414
pub(crate) fn try_new_with_handle_and_interest(
inner: T,
handle: scheduler::Handle,
interest: Interest,
) -> Result<Self, AsyncFdTryNewError<T>> {
let fd = inner.as_raw_fd();
match Registration::new_with_interest_and_handle(
&mut SourceFd(&fd), interest, handle,
) {
Ok(registration) => Ok(AsyncFd { registration, inner: Some(inner) }),
Err(cause) => Err(AsyncFdTryNewError { inner, cause }),
}
}
区别在于:TcpStream 使用 mio::net::TcpStream(它实现了 mio::event::Source),而 AsyncFd 使用 mio::unix::SourceFd——一个简单的包装器,不修改 fd 的配置(因此要求 fd 已经设置为非阻塞模式)。
AsyncFd 的使用模式体现了 ET 模型的完整语义8:
// 典型的 AsyncFd 使用模式
loop {
let mut guard = async_fd.readable().await?; // 等待可读
match guard.try_io(|inner| inner.get_ref().read(&mut buf)) {
Ok(result) => return result, // 成功读取
Err(_would_block) => continue, // 假阳性,重试
}
}
这里 guard.try_io() 的语义是:如果 I/O 操作返回 WouldBlock,自动调用 clear_ready() 清除 readiness 位。这确保了下次数据到达时,epoll 会发出新的边沿触发通知。
总结
Tokio 的 I/O 驱动和 Timer 驱动虽然在同一个 worker 线程中运行、共享同一次 epoll_wait 调用,但它们的核心机制截然不同:
Timer 驱动是一个用户态的时间调度器。它用哈希时间轮 O(1) 地管理成千上万个定时器,只向内核注册一个「最近 deadline」的唤醒点。线程被 epoll_wait 的超时机制叫醒,然后批量收割到期定时器。
I/O 驱动是一个内核态事件的分发器。它通过 epoll 把 fd 的 read/write 事件从内核调度到用户态,用 ScheduledIo 的原子状态机追踪每个 fd 的就绪状态,用 tick 机制解决边沿触发的 stale event 问题。
两者在 Tokio 中的分层设计是:
应用层:tokio::time::sleep tokio::net::TcpStream
| |
运行时层:TimerEntry / Wheel Registration / ScheduledIo
| |
驱动层: Time Driver I/O Driver (mio)
| |
内核层: hrtimer × 1 epoll (所有 fd)
驱动层通过 IoStack 堆叠在一起:外层 Timer 检查时间轮确定 timeout,内层 I/O 执行 epoll_wait。两条线在同一个线程上优雅地交织,互不干扰。
最终,这两种异步机制的实现都可以回溯到 Rust async 的核心契约1:返回 Poll::Pending 时注册 Waker,事件就绪时通过 Waker::wake() 通知调度器重新 poll。sleep 用「时间到达」作为就绪条件,TcpStream 用「IO 就绪」作为就绪条件——只是事件源不同,契约完全一样。
References
-
上篇文章:《从 tokio::time::sleep 看异步 Timer 的实现:一次从 Future::poll 到哈希时间轮的源码之旅》,2026-05-03。详细分析了
tokio::time::sleep从Future::poll、TimerEntry、StateCell到Wheel哈希时间轮的完整链路。以及更早的文章:《Rust async/await 的底层契约:从 Future::poll 到 Tokio 运行时》,2026-04-30,阐述了Future::poll、Context、Waker的核心协议。 ↩ ↩2 -
Tokio 源码,Driver 分层结构,
tokio/src/runtime/driver.rs。定义了Driver(TimeDriver包装IoStack)、Handle(IoHandle+SignalHandle+TimeHandle)、以及create_io_stack/create_time_driver的创建逻辑。 ↩ ↩2 ↩3 -
Tokio 源码,
TcpStream结构定义,tokio/src/net/tcp/stream.rs。TcpStream只是PollEvented<mio::net::TcpStream>的一层包装,构建时调用PollEvented::new(connected)。 ↩ ↩2 -
Tokio 源码,
PollEvented结构定义与poll_read方法,tokio/src/io/poll_evented.rs。PollEvented是通用 I/O 包装器,其poll_read方法展示了 ET 模式下的 read 循环(WouldBlock→clear_readiness→ 重试)以及针对 edge-triggered selector 的read部分结果优化。 ↩ ↩2 ↩3 -
Tokio 源码,
Registration结构定义与new_with_interest_and_handle、poll_io等方法,tokio/src/runtime/io/registration.rs。Registration是 I/O 资源与 reactor 的桥梁,封装了注册、就绪轮询和 WouldBlock 循环。 ↩ ↩2 ↩3 -
Tokio 源码,I/O Driver 的
Driver::new、Handle::add_source、Driver::turn,tokio/src/runtime/io/driver.rs。add_source将mio::Source注册到 epoll,token 为ScheduledIo的指针地址;turn()阻塞在epoll_wait上并分发事件。 ↩ ↩2 ↩3 -
Tokio 源码,
ScheduledIo结构定义、set_readiness、wake方法,tokio/src/runtime/io/scheduled_io.rs。核心结构:readiness为打包的 AtomicUsize(16b readiness + 15b tick + 1b shutdown),waiters为Mutex<Waiters>管理专用槽和链表等待者。set_readiness使用fetch_updateCAS 实现 tick 保护,wake使用WakeList批量唤醒。 ↩ ↩2 ↩3 ↩4 ↩5 -
Tokio 源码,
AsyncFd结构定义与使用模式,tokio/src/io/async_fd.rs。通用的 fd → async 包装器,通过mio::unix::SourceFd接入 I/O 驱动,提供readable()/writable()/async_io()等高级 API 和poll_read_ready()/try_io()等底层 API。 ↩ ↩2 ↩3