从 tokio::time::sleep 到 tokio::net::TcpStream:时间驱动与事件驱动的两种异步实现

上篇文章跟踪了 tokio::time::sleep 从 Future::poll 到哈希时间轮的完整链路。那篇文章的核心是一个问题:时间怎么做到异步等待的?本文要回答另一个问题:网络 I/O又是怎么做到异步等待的?它们共享同一个 runtime 主循环,但背后是两种完全不同的驱动模型。

引言:为什么需要第二篇?

上篇文章1中,我们看到了 tokio::time::sleep 的实现链条:

Future::poll → TimerEntry → Wheel(哈希时间轮) → Driver::park_timeout → epoll_wait

那条链路的核心架构是:用户态维护时间轮,只向内核注册一个「最早 deadline」的定时器。当线程从 epoll_wait 返回后,驱动去时间轮里收割所有到期 timer,批量唤醒对应的任务。

但这里有一个容易被忽略的细节:park_timeout 最终调用的 epoll_wait,本身就是 Linux 上最核心的 I/O 事件通知机制。时间驱动把 I/O 驱动的 park_timeout 当成了「闹钟」来用——它只是顺路借用了 epoll 的超时功能。真正的主角是I/O 驱动自己

当你写下下面这行代码时,背后发生的事情和 sleep 有着本质区别:

let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
stream.read(&mut buf).await?;

sleep 只关心”时间到了没”;TcpStream::read 关心的是”数据到了没”。前者在用户态就能判断(查时间轮),后者必须由内核通知——因为只有内核知道网卡上有没有新数据包到达。

这篇文章就从 TcpStream 注册到 I/O 驱动的完整流程讲起,拆开 ScheduledIoRegistrationPollEvented、I/O Driver 的事件循环,然后和上篇的 Timer 驱动做横向对比,看看它们在同一套 runtime 主循环里是如何共存并各自工作的

与上一篇的关系

上一篇 本文
tokio::time::sleep tokio::net::TcpStream
时间驱动(Timer Driver) I/O 驱动(I/O Driver)
哈希时间轮(用户态) epoll(内核态)
一个内核定时器管理所有 sleep 所有 fd 注册到 epoll
「时间到了」由内核定时器通知 「数据到了」由 epoll 事件通知
主动检查(poll time wheel) 被动等待(wait on epoll)

概念先行:时间驱动 vs 事件驱动

tokio::time::sleepTcpStream::read 虽然都返回 Future,都可以 .await,但它们的驱动模型有本质区别。

时间驱动的 sleep 是「到时即醒」:线程把 park timeout 设到最早的 deadline,内核在 deadline 到达时通过时钟中断唤醒线程。判断”是否到期”完全可以在用户态通过比较 deadline 和当前时间完成,不需要内核在事件发生时通知。

事件驱动的 I/O 是「有数即醒」:线程阻塞在 epoll_wait 上,内核在数据到达网卡时通过硬件中断 → 驱动 → epoll 事件通知唤醒线程。判断”是否可读”只有内核知道——因为数据包什么时候到达是不可预测的。

用一句话概括核心区别:

sleep 的等待是确定性的——我知道 5 秒后肯定到;I/O 的等待是不确定性的——我不知道数据什么时候来。

这对数据结构选择产生了深远的影响:

graph LR
    subgraph "时间驱动(sleep)"
        A1["用户态时间轮<br/>O(1) 插入/Wakeup"] -->|"最近 deadline"| B1["内核 hrtimer × 1"]
        B1 -->|"定时中断"| C1["线程醒来"]
        C1 -->|"批量收割"| A1
    end

    subgraph "事件驱动(I/O)"
        A2["ScheduledIo<br/>原子 readiness 位"] -->|"注册到"| B2["epoll<br/>内核管理所有 fd"]
        B2 -->|"可读/可写事件"| C2["线程醒来"]
        C2 -->|"更新 readiness"| A2
    end

有了这个直觉,接下来就可以拆开源码了。

一、Driver 堆栈:TimeDriver 在 IODriver 之上

在深入 TcpStream 之前,需要先理解 Tokio 运行时的 Driver 堆栈结构。

Tokio 运行时的驱动是分层的——内层是 I/O 驱动(epoll),外层是时间驱动2

// tokio/src/runtime/driver.rs: 44-50
pub(crate) struct Driver {
    inner: TimeDriver,
}

pub(crate) struct Handle {
    pub(crate) io: IoHandle,        // I/O 驱动句柄
    pub(crate) signal: SignalHandle, // 信号驱动句柄
    pub(crate) time: TimeHandle,    // 时间驱动句柄
    pub(crate) clock: Clock,
}

创建时,先创建 I/O 驱动,然后在其上包裹时间驱动2

// tokio/src/runtime/driver.rs: 108-117
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
    let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
    let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
    let (time_driver, time_handle) =
        create_time_driver(cfg.enable_time, cfg.timer_flavor, io_stack, &clock);
    // ...
}

IoStack 本身又是一个小堆栈2

IoStack (if io enabled):
  ProcessDriver (子进程事件)
    └── SignalDriver (Unix 信号)
         └── IoDriver (mio::Poll)

当时间驱动需要 park 时,它把 park_timeout 委托给 IoStack,后者最终调用 mio::Poll::poll

// tokio/src/runtime/time/mod.rs: 255-260
fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
    // ...
    match next_wake {
        Some(when) => {
            // 有定时器在将来到期:带超时 park
            self.park_thread_timeout(rt_handle, duration);
        }
        None => {
            // 没有定时器:无限期 park(除非有 limit)
            if let Some(duration) = limit {
                self.park_thread_timeout(rt_handle, duration);
            } else {
                self.park.park(rt_handle);  // 无限期等待 I/O 事件
            }
        }
    }
}

换句话说,Tokio 的 worker 线程只有一个统一的 park 循环:时间驱动检查时间轮中的最近 deadline,如果时间轮不为空,就用 epoll_wait(timeout) 带超时地等待 I/O 事件;如果时间轮为空,就无限期等待 I/O 事件——I/O 事件或者 mio::Waker 的 unpark 会把线程叫醒。

这个设计的精妙之处在于:时间驱动和 I/O 驱动共享一次 epoll 系统调用。如果时间轮里有一个 5 秒后到期的 sleep,同时一个 TCP socket 在 3 秒时收到数据——线程会在 3 秒时被 epoll 叫醒,然后时间驱动顺便检查时间轮、收割到期定时器。不需要两套独立的等待机制。

sequenceDiagram
    participant TDriver as Time Driver
    participant IDriver as I/O Driver (epoll)
    participant Kernel as Linux Kernel
    participant Clock as 时钟中断

    TDriver->>IDriver: park_timeout(5000ms)
    IDriver->>Kernel: epoll_wait(timeout=5000ms)
    Note over Kernel: 等待 I/O 事件或超时
    Kernel-->>IDriver: 数据到达 → epoll 返回
    IDriver-->>TDriver: 醒来
    TDriver->>TDriver: process_at_time() → 收割到期定时器
    TDriver->>IDriver: 处理 I/O 事件

但这里的细节是:上述 seq 图是简化版。实际的 I/O Driver 的 turn() 首先被调用,它负责处理 epoll 事件并设置 ScheduledIo 的 readiness;然后才轮到 Time Driver 处理定时器。两者的协作是通过 driver::Handle 来协调的。

二、TcpStream 的创建:一条完整的注册链

现在来看 TcpStream::connect("127.0.0.1:8080") 背后到底发生了什么。

2.1 从 TcpStream 到 PollEvented

TcpStream 的结构非常简单——它只是一个 PollEvented 的包装3

// tokio/src/net/tcp/stream.rs: 72-74
pub struct TcpStream {
    io: PollEvented<mio::net::TcpStream>,
}

PollEvented 是 Tokio 的一个通用包装器:它接收一个实现了 mio::event::Source 的类型(如 mio::net::TcpStream),将其注册到运行时的 I/O 驱动上4

// tokio/src/io/poll_evented.rs: 89-94
pub(crate) struct PollEvented<E: Source> {
    io: Option<E>,
    registration: Registration,
}

创建 PollEvented 时,需要拿到当前 runtime 的调度器句柄(scheduler::Handle),然后用它来完成注册4

// tokio/src/io/poll_evented.rs: 130-139
pub(crate) fn new_with_interest_and_handle(
    mut io: E,
    interest: Interest,
    handle: scheduler::Handle,
) -> io::Result<Self> {
    let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
    Ok(Self { io: Some(io), registration })
}

TcpStream::new 只做了一件事:把 mio::net::TcpStream 交给 PollEvented::new3

// tokio/src/net/tcp/stream.rs: 160-163
pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
    let io = PollEvented::new(connected)?;
    Ok(TcpStream { io })
}

2.2 从 PollEvented 到 Registration

Registration 是更底层的抽象。它持有一个 Arc<ScheduledIo>,后者是 I/O 驱动中真正管理状态的结构5

// tokio/src/runtime/io/registration.rs: 64-72
pub(crate) struct Registration {
    handle: scheduler::Handle,
    shared: Arc<ScheduledIo>,
}

Registration::new_with_interest_and_handle 是整条注册链的关键入口5

// tokio/src/runtime/io/registration.rs: 91-98
pub(crate) fn new_with_interest_and_handle(
    io: &mut impl Source,
    interest: Interest,
    handle: scheduler::Handle,
) -> io::Result<Registration> {
    let shared = handle.driver().io().add_source(io, interest)?;
    Ok(Registration { handle, shared })
}

这里 handle.driver().io() 拿到 I/O Driver 的 Handle,然后调用 add_source

2.3 从 Registration 到 I/O Driver

Handle::add_source 是真正把 fd 注册到 epoll 的地方6

// tokio/src/runtime/io/driver.rs: 182-207
pub(super) fn add_source(
    &self,
    source: &mut impl mio::event::Source,
    interest: Interest,
) -> io::Result<Arc<ScheduledIo>> {
    // 1. 从 RegistrationSet 中分配一个新的 ScheduledIo
    let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
    let token = scheduled_io.token();

    // 2. 通过 mio::Registry 将 source 注册到 epoll
    //    token 是 ScheduledIo 对象的指针值(作为 epoll 的标识)
    if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
        // 注册失败:从 set 中移除分配的 ScheduledIo
        unsafe { self.registrations.remove(&mut self.synced.lock(), &scheduled_io) };
        return Err(e);
    }

    self.metrics.incr_fd_count();
    Ok(scheduled_io)
}

这里有一个关键设计:token 是 ScheduledIo 对象的地址mio::Token 是一个 usize,而 ScheduledIo 是一个 Arc 管理的堆对象。add_source 使用 PtrExposeDomainScheduledIo 的地址暴露为 mio::Token,这样 epoll 返回事件时,Tokio 可以直接通过 token 反向找到对应的 ScheduledIo6

// tokio/src/runtime/io/scheduled_io.rs: 268-270
pub(crate) fn token(&self) -> mio::Token {
    mio::Token(super::EXPOSE_IO.expose_provenance(self))
}

整条注册链可以用下图来总结:

graph TD
    A["TcpStream::connect(addr)"] --> B["mio::net::TcpStream::connect(addr)"]
    B --> C["TcpStream::new(sys)"]
    C --> D["PollEvented::new(connected)"]
    D --> E["Registration::new_with_interest_and_handle"]
    E --> F["Handle::add_source"]
    F --> G["RegistrationSet::allocate → Arc<ScheduledIo>"]
    F --> H["mio::Registry::register(source, token, interest)"]
    H --> I["epoll_ctl(EPOLL_CTL_ADD, fd, ...)"]

    style G fill:#e1f5fe
    style H fill:#e1f5fe
    style I fill:#c8e6c9

注册完成后,TcpStreamPollEvented 内部持有一个 RegistrationRegistration 内部持有一个 Arc<ScheduledIo>,而 ScheduledIo 的地址就是它在 epoll 中的 token。通过这个 token,epoll 事件可以反向定位到 ScheduledIo,运行时可快速处理就绪事件。

三、ScheduledIo:边沿触发的原子状态机

ScheduledIo 是 I/O 驱动的核心数据结构。如果说 Timer 驱动的核心是哈希时间轮和 TimerShared,那 I/O 驱动的核心就是 ScheduledIo 和它的原子状态7

// tokio/src/runtime/io/scheduled_io.rs: 59-61
pub(crate) struct ScheduledIo {
    linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
    readiness: AtomicUsize,     // 打包的 readiness 状态
    waiters: Mutex<Waiters>,    // 等待队列
}

3.1 打包的原子状态

readiness 字段是一个 AtomicUsize,但它承载了三个独立的信息,通过位打包(bit packing)放在一个原子变量里7:

// tokio/src/runtime/io/scheduled_io.rs: 224-230
// | shutdown | driver tick | readiness |
// |----------+-------------+-----------|
// |   1 bit  |   15 bits   |  16 bits  |

const READINESS: bit::Pack = bit::Pack::least_significant(16);
const TICK: bit::Pack = READINESS.then(15);
const SHUTDOWN: bit::Pack = TICK.then(1);

三个状态打包在一个 AtomicUsize 中,意味着对 readiness 的更新和检查都是原子操作,无需外部锁

3.2 和 TimerShared 的对比

上篇文章介绍了 TimerShared 及其 StateCell(用 AtomicU64 实现的原子状态机)。两者都是原子状态机,但解决的问题不同:

方面 TimerShared (sleep) ScheduledIo (I/O)
核心状态 到期时间、是否已触发 readiness、tick
驱动方式 时间轮到期触发 epoll 事件通知
更新方式 CAS 循环(set_expiration/mark_pending fetch_updateset_readiness
通知机制 fire()Waker::wake() wake(ready)Waker::wake()
并发角色 用户端(TimerEntry)+ 驱动端(Driver 用户端(Registration)+ 驱动端(Driver
数据结构 侵入式链表节点(用于时间轮 slot) 侵入式链表节点(用于 RegistrationSet)

3.3 边沿触发(ET)模型:为什么需要 tick

mio 在 Linux 上默认使用 epoll 的边沿触发(Edge-Triggered, ET)模式。ET 模式的特点是:只在状态从「不可用」变为「可用」时通知一次。这意味着:

  1. epoll 通知 readable → ScheduledIo 设置 READABLE 位
  2. 用户尝试 read() → 成功读到部分数据
  3. epoll 不会再次通知(因为状态没有从不可读变为可读)
  4. 用户可以继续 read() 直到返回 WouldBlock → 清除 READABLE 位 → 下次 epoll 通知再来

但这里有一个微妙的问题:如何防止清理旧事件时误清新事件? 这就是 tick 的作用。

sequenceDiagram
    participant EPoll as epoll
    participant SI as ScheduledIo
    participant Task as User Task

    EPoll->>SI: poll 返回 readable(tick=1)
    SI->>SI: set_readiness: 设置 READABLE + tick=2
    SI-->>Task: wake()
    Task->>SI: poll_read_ready()
    SI-->>Task: READABLE(tick=2)
    Task->>Task: read() → 成功
    Note over Task: 还没读完...
    EPoll->>SI: 新数据到达,再次通知(tick=3)
    SI->>SI: set_readiness: tick=3
    Task->>SI: clear_readiness(tick=2)  ← 旧 tick
    SI->>SI: tick 不匹配 → 无操作
    Task->>SI: poll_read_ready()
    SI-->>Task: READABLE(tick=3 时设置)
    Task->>Task: 继续读取

关键机制在 set_readiness7

// tokio/src/runtime/io/scheduled_io.rs: 277-298
pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) {
    let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| {
        const MAX_TICK: usize = TICK.max_value() + 1;
        let tick = TICK.unpack(curr);

        let new_tick = match tick_op {
            // 清理 readiness 时:如果 tick 不匹配,跳过此次操作
            Tick::Clear(t) if tick as u8 != t => return None,
            Tick::Clear(t) => t as usize,
            // 设置 readiness 时:递增 tick
            Tick::Set => tick.wrapping_add(1) % MAX_TICK,
        };
        let ready = Ready::from_usize(READINESS.unpack(curr));
        Some(TICK.pack(new_tick, f(ready).as_usize()))
    });
}

fetch_update 是一个乐观 CAS 循环:它尝试读取当前值、调用闭包生成新值、然后原子地替换。如果闭包返回 None(例如清理时的 tick 不匹配),则不进行任何更新。

这正是 ET 模型所需要的:每次 epoll 通知都会递增 tick,旧的事件清不到新 tick 的脏位。如果没有这个 tick 机制,一个慢任务可能在清理旧事件时无意中清掉了新到达的事件,导致数据静默丢失。

3.4 Waiters:等待队列

ScheduledIo 的第三个字段 waiters 管理了等待此 I/O 资源的任务7

// tokio/src/runtime/io/scheduled_io.rs: 70-78
#[derive(Debug, Default)]
struct Waiters {
    /// 所有等待者的侵入式链表
    list: WaitList,

    /// AsyncRead 专用 waker 槽
    reader: Option<Waker>,

    /// AsyncWrite 专用 waker 槽
    writer: Option<Waker>,
}

这里有两个并行的机制:

  1. 专用 waker 槽reader / writer):用于 poll_read_ready / poll_write_ready 快速路径——每个方向只有一个任务在等待,直接用 Option<Waker> 存,无需链表
  2. 通用等待链表:用于 readiness() async 方法——支持多个任务同时等待同一个 ScheduledIo 的不同 Interest

当 I/O 事件到达时,wake() 方法遍历这两者并唤醒对应的任务7

// tokio/src/runtime/io/scheduled_io.rs: 304-348
pub(super) fn wake(&self, ready: Ready) {
    let mut wakers = WakeList::new();
    let mut waiters = self.waiters.lock();

    // 检查 read 槽
    if ready.is_readable() {
        if let Some(waker) = waiters.reader.take() {
            wakers.push(waker);
        }
    }
    // 检查 write 槽
    if ready.is_writable() {
        if let Some(waker) = waiters.writer.take() {
            wakers.push(waker);
        }
    }

    // 遍历链表中的等待者
    'outer: loop {
        let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
        while wakers.can_push() {
            match iter.next() {
                Some(waiter) => {
                    // ... 收集 waker,标记已就绪
                }
                None => break 'outer,
            }
        }
        // 批次满了:释放锁后批量唤醒
        drop(waiters);
        wakers.wake_all();
        waiters = self.waiters.lock();
    }
    // ...
}

wake_all 在释放锁之后进行,避免持锁时调用外部代码导致死锁——这和上篇文章中 Time Driver 的 waker_list.wake_all() 是相同的模式。

四、事件循环:turn()

当一切就绪后,是谁来驱动 I/O 事件的处理的?答案是 I/O Driver 的 turn() 方法6

// tokio/src/runtime/io/driver.rs: 122-174
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
    // 处理待释放的注册(被 drop 的 Registration)
    handle.release_pending_registrations();

    let events = &mut self.events;

    // 核心:阻塞在 epoll_wait 上
    match self.poll.poll(events, max_wait) {
        Ok(()) => {}
        Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
        // ...
    }

    // 处理所有返回的事件
    let mut ready_count = 0;
    for event in events.iter() {
        let token = event.token();

        if token == TOKEN_WAKEUP {
            // mio::Waker 的 unpark,无需处理
        } else if token == TOKEN_SIGNAL {
            self.signal_ready = true;
        } else {
            let ready = Ready::from_mio(event);
            // token 就是 ScheduledIo 的地址
            let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
            let io: &ScheduledIo = unsafe { &*ptr };

            // 更新 readiness 状态并唤醒等待任务
            io.set_readiness(Tick::Set, |curr| curr | ready);
            io.wake(ready);

            ready_count += 1;
        }
    }
    // ...
}

turn() 的工作流程清晰明了:

  1. 释放待处理的注册:清理之前被 drop 但尚未从 epoll 中移除的 Registration
  2. 阻塞等待self.poll.poll(events, max_wait) —— 对,这就是 epoll_wait
  3. 分发事件:遍历 epoll 返回的每个事件,通过 token 找到对应的 ScheduledIo,设置 readiness 位,唤醒等待的任务

当 I/O 驱动独立运行时(如 Time Driver 没有启用),max_waitNone 表示无限期等待;当 Time Driver 有定时器时,max_wait 为”到最早 deadline 的时间差”。

整体驱动循环

将 I/O 驱动和时间驱动放在一起看,Tokio 的 worker 线程主循环大致是:

sequenceDiagram
    participant Worker as Worker Thread
    participant Time as Time Driver
    participant IO as I/O Driver
    participant Sched as Scheduler

    loop 主循环
        Worker->>Time: park_internal()
        Time->>Time: 检查时间轮 → 下一个 deadline
        Time->>IO: park/park_timeout(duration)
        IO->>IO: epoll_wait(events, duration)
        Note over IO: 被 I/O 事件或超时唤醒
        IO-->>Time: 返回
        Time->>Time: process_at_time(now)
        Time->>Time: 收割到期 timer
        Time-->>Worker: 线程继续
        Worker->>Sched: 运行就绪任务
    end

这也就是在上一篇文章中看到的 park_internal 流程——只是现在我们把 park 的底层展开了,看到了 I/O 驱动这一层。

五、读写的完整路径:从 poll 到 buffer

理解了数据结构后,来看 TcpStream::read(&mut buf).await 的完整路径。当一个 PollEvented 包装的 socket 被 poll_read4

// tokio/src/io/poll_evented.rs: 169-194
pub(crate) unsafe fn poll_read<'a>(
    &'a self,
    cx: &mut Context<'_>,
    buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>
where &'a E: io::Read + 'a,
{
    loop {
        // 1. 等待读就绪
        let evt = ready!(self.registration.poll_read_ready(cx))?;

        // 2. 尝试读取
        match self.io.as_ref().unwrap().read(b) {
            Ok(n) => {
                // 4. ET 优化:读了一部分但没读完,不清除 readiness
                //    (epoll 模式下可以继续读)
                #[cfg(not(mio_unsupported_force_poll_poll))]
                if 0 < n && n < len {
                    self.registration.clear_readiness(evt);
                }

                buf.advance(n);
                return Poll::Ready(Ok(()));
            }
            // 3. 读阻塞了(false positive):清除 readiness 后重试
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                self.registration.clear_readiness(evt);
            }
            Err(e) => return Poll::Ready(Err(e)),
        }
    }
}

这个模式比 Sleep::poll 复杂——因为 I/O 有假阳性(false positive)。epoll 返回 readable 后,fd 可能因为竞争等原因实际上不可读了。所以 I/O 的 poll 循环是:

poll_read_ready → 返回 Ready → 尝试 read
    ├─ 成功 → 返回数据
    └─ WouldBlock → clear_readiness → 重新 poll

Sleep::poll 是:

检查时间轮 → 到期了吗?
    ├─ 是 → 返回 Ready
    └─ 否 → 注册 waker → 返回 Pending

sleep 没有假阳性——deadline 到了就是到了。I/O 有假阳性——epoll 说 readable 了,但数据可能已经被别的线程读了,或者在监听 socket 上有新的 accept 提前消耗了事件。

Registration 还提供了一个更底层的 poll_io 方法——它把”等待就绪 → 尝试操作 → WouldBlock → 清理 → 重试”的循环封装在一个函数里5

// tokio/src/runtime/io/registration.rs: 162-175
fn poll_io<R>(
    &self,
    cx: &mut Context<'_>,
    direction: Direction,
    mut f: impl FnMut() -> io::Result<R>,
) -> Poll<io::Result<R>> {
    loop {
        let ev = ready!(self.poll_ready(cx, direction))?;
        match f() {
            Ok(ret) => return Poll::Ready(Ok(ret)),
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                self.clear_readiness(ev);
            }
            Err(e) => return Poll::Ready(Err(e)),
        }
    }
}

六、你还在读同一条线程吗:关于「既处理 I/O 又处理 Timer」

一个常见的困惑是:Time Driver 和 I/O Driver 是在同一条线程上运行的吗?不冲突吗?

答案是是的,同一条线程,不冲突。原因可以用一句话概括:当线程在 epoll_wait 上阻塞时,时间驱动作了 I/O 驱动的一层薄薄的外包装,它做的所有事就是设置 epoll_wait 的 timeout 参数

当 Time Driver 调用 park_timeout(duration) 时,它最终调用了 I/O Driver 的 park_timeout(duration)。I/O Driver 的 turn()duration 传给 epoll_wait。这也就是 sleep 文章中提到的那条链路。

当 I/O 事件在 duration 到达之前触发,线程提前醒来,Time Driver 在 park_internal 中返回后立即调用 process() 收割到期定时器。I/O 事件的处理和时间到期检测在同一个函数调用栈中完成,没有竞争、没有额外的上下文切换

graph TD
    subgraph "Worker 线程主循环"
        A["开始"] --> B["Time::park_internal()"]
        B --> C["计算 next_wake"]
        C --> D["IoStack::park_timeout(duration)"]
        D --> E["epoll_wait(timeout)"]
        E -->|"I/O 事件或超时"| F["Time::process_at_time()"]
        F --> G["收割到期 timer"]
        G --> H["Scheduler::run 处理就绪任务"]
        H --> B
    end

这种「一层包一层」的设计,使得 Time Driver 和 I/O Driver 完全共享同一个线程和同一次 epoll_wait 调用。没有线程切换、没有锁竞争(除了 ScheduledIo 内部的 Mutex)、没有额外的调度开销。

七、对比总览:时间驱动 vs 事件驱动

现在可以把两种表并列在一起看:

维度 时间驱动(sleep) 事件驱动(I/O)
核心数据结构 6 层哈希时间轮(Wheel 逐 fd 的原子状态机(ScheduledIo
状态管理 StateCell(AtomicU64:到期时间 + 标记位) AtomicUsize(16b readiness + 15b tick + 1b shutdown)
等待机制 比较 deadline vs now(用户态) 等待 epoll 通知(内核态)
唤醒方式 fire()Waker::wake() wake(ready)Waker::wake()
假阳性 无(时间到了就是到了) 有(epoll 可能虚报)
插入/注册 Wheel::insert() → O(1) mio::Registry::register() → 系统调用
单次检查 检查当前 slot 内的所有 entry 检查 epoll 返回的所有 event
超时/时间驱动 核心机制:靠 deadline 驱动 仅用于设置 epoll_wait timeout
内核交互 1 个定时器(hrtimer) 所有 fd 注册到 epoll
精确度 1ms 纳秒级(取决于内核)

核心差异的本质

两种驱动模型差异的根源在于同一个问题:「事件」发生的时机是否可控

时间是可控的。给定一个 deadline,Tokio 可以在用户态用时间轮 O(1) 地判断是否到期。它不需要内核在”到期”这个事件发生时主动通知——它只需要知道当前时间,然后比较即可。内核定时器只是用来让线程不要一直空转。

网络 I/O是不可控的。内核之外的客户端可能在任意时刻发送数据包。Tokio 必须靠内核来通知——因为只有内核驱动(网卡中断 → 协议栈)知道数据什么时候到达。epoll 就是这个”内核通知机制”的接口。

这种「可控 vs 不可控」的区别,导致了两套完全不同的数据结构选择:

graph LR
    subgraph "可控事件"
        A1["sleep(5s)"] --> A2["deadline = now + 5s"]
        A2 --> A3["插入时间轮(O(1))"]
        A3 --> A4["等待时:检查时间轮<br/>全部在用户态完成"]
        A4 --> A5["到期:fire()"]
    end

    subgraph "不可控事件"
        B1["TcpStream::read()"] --> B2["注册 fd 到 epoll"]
        B2 --> B3["等待时:epoll_wait<br/>内核挂起线程"]
        B3 --> B4["数据到达:epoll 通知"]
        B4 --> B5["设置 readiness + wake()"]
    end

八、另一种不可控事件:AsyncFd

tokio 的 I/O 驱动不只服务于网络 socket。AsyncFd 是一个通用的包装器,允许任何实现了 AsRawFd 的非阻塞文件描述符接入 Tokio 的 I/O 驱动8

// tokio/src/io/async_fd.rs: 248-255
pub struct AsyncFd<T: AsRawFd> {
    registration: Registration,
    inner: Option<T>,
}

它的创建流程和 TcpStream 本质相同8

// tokio/src/io/async_fd.rs: 406-414
pub(crate) fn try_new_with_handle_and_interest(
    inner: T,
    handle: scheduler::Handle,
    interest: Interest,
) -> Result<Self, AsyncFdTryNewError<T>> {
    let fd = inner.as_raw_fd();

    match Registration::new_with_interest_and_handle(
        &mut SourceFd(&fd), interest, handle,
    ) {
        Ok(registration) => Ok(AsyncFd { registration, inner: Some(inner) }),
        Err(cause) => Err(AsyncFdTryNewError { inner, cause }),
    }
}

区别在于:TcpStream 使用 mio::net::TcpStream(它实现了 mio::event::Source),而 AsyncFd 使用 mio::unix::SourceFd——一个简单的包装器,不修改 fd 的配置(因此要求 fd 已经设置为非阻塞模式)。

AsyncFd 的使用模式体现了 ET 模型的完整语义8

// 典型的 AsyncFd 使用模式
loop {
    let mut guard = async_fd.readable().await?;     // 等待可读
    match guard.try_io(|inner| inner.get_ref().read(&mut buf)) {
        Ok(result) => return result,                   // 成功读取
        Err(_would_block) => continue,                // 假阳性,重试
    }
}

这里 guard.try_io() 的语义是:如果 I/O 操作返回 WouldBlock,自动调用 clear_ready() 清除 readiness 位。这确保了下次数据到达时,epoll 会发出新的边沿触发通知。

总结

Tokio 的 I/O 驱动和 Timer 驱动虽然在同一个 worker 线程中运行、共享同一次 epoll_wait 调用,但它们的核心机制截然不同:

Timer 驱动是一个用户态的时间调度器。它用哈希时间轮 O(1) 地管理成千上万个定时器,只向内核注册一个「最近 deadline」的唤醒点。线程被 epoll_wait 的超时机制叫醒,然后批量收割到期定时器。

I/O 驱动是一个内核态事件的分发器。它通过 epoll 把 fd 的 read/write 事件从内核调度到用户态,用 ScheduledIo 的原子状态机追踪每个 fd 的就绪状态,用 tick 机制解决边沿触发的 stale event 问题。

两者在 Tokio 中的分层设计是:

应用层:tokio::time::sleep         tokio::net::TcpStream
                |                          |
运行时层:TimerEntry / Wheel          Registration / ScheduledIo
                |                          |
驱动层:  Time Driver                I/O Driver (mio)
                |                          |
内核层:  hrtimer × 1                epoll (所有 fd)

驱动层通过 IoStack 堆叠在一起:外层 Timer 检查时间轮确定 timeout,内层 I/O 执行 epoll_wait。两条线在同一个线程上优雅地交织,互不干扰。

最终,这两种异步机制的实现都可以回溯到 Rust async 的核心契约1返回 Poll::Pending 时注册 Waker,事件就绪时通过 Waker::wake() 通知调度器重新 pollsleep 用「时间到达」作为就绪条件,TcpStream 用「IO 就绪」作为就绪条件——只是事件源不同,契约完全一样。

References

  1. 上篇文章:《从 tokio::time::sleep 看异步 Timer 的实现:一次从 Future::poll 到哈希时间轮的源码之旅》,2026-05-03。详细分析了 tokio::time::sleepFuture::pollTimerEntryStateCellWheel 哈希时间轮的完整链路。以及更早的文章:《Rust async/await 的底层契约:从 Future::poll 到 Tokio 运行时》,2026-04-30,阐述了 Future::pollContextWaker 的核心协议。  2

  2. Tokio 源码,Driver 分层结构,tokio/src/runtime/driver.rs。定义了 DriverTimeDriver 包装 IoStack)、HandleIoHandle + SignalHandle + TimeHandle)、以及 create_io_stack / create_time_driver 的创建逻辑。  2 3

  3. Tokio 源码,TcpStream 结构定义,tokio/src/net/tcp/stream.rsTcpStream 只是 PollEvented<mio::net::TcpStream> 的一层包装,构建时调用 PollEvented::new(connected)。  2

  4. Tokio 源码,PollEvented 结构定义与 poll_read 方法,tokio/src/io/poll_evented.rsPollEvented 是通用 I/O 包装器,其 poll_read 方法展示了 ET 模式下的 read 循环(WouldBlockclear_readiness → 重试)以及针对 edge-triggered selector 的 read 部分结果优化。  2 3

  5. Tokio 源码,Registration 结构定义与 new_with_interest_and_handlepoll_io 等方法,tokio/src/runtime/io/registration.rsRegistration 是 I/O 资源与 reactor 的桥梁,封装了注册、就绪轮询和 WouldBlock 循环。  2 3

  6. Tokio 源码,I/O Driver 的 Driver::newHandle::add_sourceDriver::turntokio/src/runtime/io/driver.rsadd_sourcemio::Source 注册到 epoll,token 为 ScheduledIo 的指针地址;turn() 阻塞在 epoll_wait 上并分发事件。  2 3

  7. Tokio 源码,ScheduledIo 结构定义、set_readinesswake 方法,tokio/src/runtime/io/scheduled_io.rs。核心结构:readiness 为打包的 AtomicUsize(16b readiness + 15b tick + 1b shutdown),waitersMutex<Waiters> 管理专用槽和链表等待者。set_readiness 使用 fetch_update CAS 实现 tick 保护,wake 使用 WakeList 批量唤醒。  2 3 4 5

  8. Tokio 源码,AsyncFd 结构定义与使用模式,tokio/src/io/async_fd.rs。通用的 fd → async 包装器,通过 mio::unix::SourceFd 接入 I/O 驱动,提供 readable() / writable() / async_io() 等高级 API 和 poll_read_ready() / try_io() 等底层 API。  2 3

My Github Page: https://github.com/liweinan

B站视频: https://space.bilibili.com/21947620

Powered by Jekyll and Theme by solid

If you have any question want to ask or find bugs regarding with my blog posts, please report it here:
https://github.com/liweinan/liweinan.github.io/issues