从 tokio::time::sleep 到 tokio::net::TcpStream:时间驱动与事件驱动的两种异步实现

上篇文章跟踪了 tokio::time::sleep 从 Future::poll 到哈希时间轮的完整链路。那篇文章的核心是一个问题:时间怎么做到异步等待的?本文要回答另一个问题:网络 I/O又是怎么做到异步等待的?它们共享同一个 runtime 主循环,但背后是两种完全不同的驱动模型。

引言:为什么需要第二篇?

上篇文章1中,我们看到了 tokio::time::sleep 的实现链条:

Future::poll → TimerEntry → Wheel(哈希时间轮) → Driver::park_timeout → epoll_wait

那条链路的核心架构是:用户态维护时间轮,只向内核注册一个「最早 deadline」的定时器。当线程从 epoll_wait 返回后,驱动去时间轮里收割所有到期 timer,批量唤醒对应的任务。

但这里有一个容易被忽略的细节:park_timeout 最终调用的 epoll_wait,本身就是 Linux 上最核心的 I/O 事件通知机制。时间驱动把 I/O 驱动的 park_timeout 当成了「闹钟」来用——它只是顺路借用了 epoll 的超时功能。真正的主角是I/O 驱动自己

当你写下下面这行代码时,背后发生的事情和 sleep 有着本质区别:

let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
stream.read(&mut buf).await?;

sleep 只关心”时间到了没”;TcpStream::read 关心的是”数据到了没”。前者在用户态就能判断(查时间轮),后者必须由内核通知——因为只有内核知道网卡上有没有新数据包到达。

这篇文章就从 TcpStream 注册到 I/O 驱动的完整流程讲起,拆开 ScheduledIoRegistrationPollEvented、I/O Driver 的事件循环,然后和上篇的 Timer 驱动做横向对比,看看它们在同一套 runtime 主循环里是如何共存并各自工作的

先看一段代码

和上一篇文章一样,先从一个可运行的程序建立直觉。下面是一个简单的 TCP echo 服务,用 Tokio 同时处理多个连接:

// tokio_echo.rs
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        // 等待新连接(异步)
        let (mut socket, addr) = listener.accept().await?;

        // 每个连接 spawn 一个独立的任务
        tokio::spawn(async move {
            let mut buf = [0; 1024];

            // 在一个连接上循环读写(异步)
            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => break,            // 连接关闭
                    Ok(n) => {
                        // 把读到的内容原样写回去
                        let _ = socket.write_all(&buf[..n]).await;
                    }
                    Err(_) => break,
                }
            }

            println!("连接 {} 关闭", addr);
        });
    }
}

用一条线程跑上面的服务,它就可以同时接受成千上万个连接。每个连接上的 acceptreadwrite 都是异步的——调用时不会阻塞线程,线程会去服务其他连接。

对比一下同步阻塞版本的多线程实现:

// sync_echo.rs(多线程阻塞版)
use std::net::TcpListener;
use std::thread;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;

    loop {
        let (mut socket, addr) = listener.accept()?;

        // 每个连接启动一条新线程
        thread::spawn(move || {
            let mut buf = [0; 1024];

            loop {
                let n = socket.read(&mut buf).unwrap();
                if n == 0 { break; }
                socket.write_all(&buf[..n]).unwrap();
            }

            println!("连接 {} 关闭", addr);
        });
    }
}

同步版本也能处理多连接,但代价是每条连接一条 OS 线程。每个 thread::spawn 都涉及内核态的线程创建、栈分配(默认 2MB)、以及调度器的上下文切换。几千个连接就需要几千条线程——OS 调度器很快就不堪重负。

而 Tokio 版本却用一条线程做到了同步版本需要几千条线程才能做到的事。秘密就藏在这三个 .await 背后:它们不会阻塞线程,只会把任务挂起,把线程还给调度器去服务其他连接。

方面 同步多线程版 Tokio 单线程版
并发模型 每个连接一条 OS 线程 单线程管理海量连接
资源开销 每条线程 2MB 栈 + OS 调度 每个任务几十字节的 Future 状态
上下文切换 OS 内核态切换 用户态 async 切换
极限连接数 几千条线程后难以扩展 单线程可处理数十万连接
IO 操作 read/write 阻塞线程 .await 挂起任务不占线程

再看客户端怎么写。用 tokio 连接服务器并发送一条消息:

// tokio_client.rs
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;

    // 发送数据
    stream.write_all(b"hello from tokio!").await?;

    // 读取回显
    let mut buf = [0; 1024];
    let n = stream.read(&mut buf).await?;
    println!("收到回显:{:?}", &buf[..n]);

    Ok(())
}

服务端和客户端都用 tokio,双方都不占线程——两个 .await 已经把线程还给了各自的调度器。

完整的实验场景可以这样跑:

# 终端 1:启动 echo 服务器
$ cargo run --example tokio_echo

# 终端 2:启动客户端,看到回显
$ cargo run --example tokio_client
收到回显:"hello from tokio!"

接下来的内容就是拆开这个「秘密」——看看 .await 背后 Tokio 的 I/O 驱动是怎么利用 epoll 实现非阻塞 I/O 的。

与上一篇的关系

上一篇 本文
tokio::time::sleep tokio::net::TcpStream
时间驱动(Timer Driver) I/O 驱动(I/O Driver)
哈希时间轮(用户态) epoll(内核态)
一个内核定时器管理所有 sleep 所有 fd 注册到 epoll
「时间到了」由内核定时器通知 「数据到了」由 epoll 事件通知
主动检查(poll time wheel) 被动等待(wait on epoll)

概念先行:时间驱动 vs 事件驱动

tokio::time::sleepTcpStream::read 虽然都返回 Future,都可以 .await,但它们的驱动模型有本质区别。

时间驱动的 sleep 是「到时即醒」:线程把 park timeout 设到最早的 deadline,内核在 deadline 到达时通过时钟中断唤醒线程。判断”是否到期”完全可以在用户态通过比较 deadline 和当前时间完成,不需要内核在事件发生时通知。

事件驱动的 I/O 是「有数即醒」:线程阻塞在 epoll_wait 上,内核在数据到达网卡时通过硬件中断 → 驱动 → epoll 事件通知唤醒线程。判断”是否可读”只有内核知道——因为数据包什么时候到达是不可预测的。

用一句话概括核心区别:

sleep 的等待是确定性的——我知道 5 秒后肯定到;I/O 的等待是不确定性的——我不知道数据什么时候来。

这对数据结构选择产生了深远的影响:

graph LR
    subgraph "时间驱动(sleep)"
        A1["用户态时间轮<br/>O(1) 插入/Wakeup"] -->|"最近 deadline"| B1["内核 hrtimer × 1"]
        B1 -->|"定时中断"| C1["线程醒来"]
        C1 -->|"批量收割"| A1
    end

    subgraph "事件驱动(I/O)"
        A2["ScheduledIo<br/>原子 readiness 位"] -->|"注册到"| B2["epoll<br/>内核管理所有 fd"]
        B2 -->|"可读/可写事件"| C2["线程醒来"]
        C2 -->|"更新 readiness"| A2
    end

有了这个直觉,接下来就可以拆开源码了。

一、Driver 堆栈:TimeDriver 在 IODriver 之上

在深入 TcpStream 之前,需要先理解 Tokio 运行时的 Driver 堆栈结构。

Tokio 运行时的驱动是分层的——内层是 I/O 驱动(epoll),外层是时间驱动2

// tokio/src/runtime/driver.rs: 44-50
pub(crate) struct Driver {
    inner: TimeDriver,
}

pub(crate) struct Handle {
    pub(crate) io: IoHandle,        // I/O 驱动句柄
    pub(crate) signal: SignalHandle, // 信号驱动句柄
    pub(crate) time: TimeHandle,    // 时间驱动句柄
    pub(crate) clock: Clock,
}

这是 Worker 线程唯一能看到的 Driver struct——runtime::driver::Driver 是整个 runtime 对外暴露的统一 park 入口。Worker 的主循环永远只调这个 struct 的 park(handle)park_timeout(handle, duration),从不关心内部嵌套了多少层。

用一句话概括:Driver 就是 runtime 的 sleep/wait 机制——线程通过它知道该睡多久、被谁叫醒、醒来后该干什么。

创建时,先创建 I/O 驱动,然后在其上包裹时间驱动2

不过要注意,TimeDriver 的名字是历史遗留——它实际是一个条件编译 enum。当 time feature 关闭时,TimeDriver 被直接退化为 IoStackDriver { inner: TimeDriver } 变成了 Driver { inner: IoStack },Timer 层完全不存在:

// runtime/driver.rs
cfg_not_time! {
    type TimeDriver = IoStack;  // ← 没有 time 时,inner 直接是 IoStack
}

所以三种 feature 组合下的实际形态是:

time io Driver inner 的实际类型
开启 开启 TimeDriver::Enabled { driver: time::Driver { park: IoStack(...) } }
关闭 开启 IoStack::Enabled(...)TimeDriver 退化为此)
关闭 关闭 IoStack::Disabled(ParkThread)(连 I/O 都没有时退化到 Condvar

不论哪种配置,Worker 线程式始终只调 driver.park(handle)——runtime::driver::Driver 是唯一入口,内部嵌套是私有的实现细节

// tokio/src/runtime/driver.rs: 47-58
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
    let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
    let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
    let (time_driver, time_handle) =
        create_time_driver(cfg.enable_time, cfg.timer_flavor, io_stack, &clock);
    // ...
}

IoStack 本身又是一个小堆栈2

IoStack (if io enabled):
  ProcessDriver (子进程事件)
    └── SignalDriver (Unix 信号)
         └── IoDriver (mio::Poll)

当时间驱动需要 park 时,它把 park_timeout 委托给 IoStack,后者最终调用 mio::Poll::poll

// tokio/src/runtime/time/mod.rs: 255-260
fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
    // ...
    match next_wake {
        Some(when) => {
            // 有定时器在将来到期:带超时 park
            self.park_thread_timeout(rt_handle, duration);
        }
        None => {
            // 没有定时器:无限期 park(除非有 limit)
            if let Some(duration) = limit {
                self.park_thread_timeout(rt_handle, duration);
            } else {
                self.park.park(rt_handle);  // 无限期等待 I/O 事件
            }
        }
    }
}

换句话说,Tokio 的 worker 线程只有一个统一的 park 循环:时间驱动检查时间轮中的最近 deadline,如果时间轮不为空,就用 epoll_wait(timeout) 带超时地等待 I/O 事件;如果时间轮为空,就无限期等待 I/O 事件——I/O 事件或者 mio::Waker 的 unpark 会把线程叫醒。

这个设计的精妙之处在于:时间驱动和 I/O 驱动共享一次 epoll 系统调用。如果时间轮里有一个 5 秒后到期的 sleep,同时一个 TCP socket 在 3 秒时收到数据——线程会在 3 秒时被 epoll 叫醒,然后时间驱动顺便检查时间轮、收割到期定时器。不需要两套独立的等待机制。

sequenceDiagram
    participant TDriver as Time Driver
    participant IDriver as I/O Driver (epoll)
    participant Kernel as Linux Kernel
    participant Clock as 时钟中断

    TDriver->>IDriver: park_timeout(5000ms)
    IDriver->>Kernel: epoll_wait(timeout=5000ms)
    Note over Kernel: 等待 I/O 事件或超时
    Kernel-->>IDriver: 数据到达 → epoll 返回
    IDriver-->>TDriver: 醒来
    TDriver->>TDriver: process_at_time() → 收割到期定时器
    TDriver->>IDriver: 处理 I/O 事件

但这里的细节是:上述 seq 图是简化版。实际的 I/O Driver 的 turn() 首先被调用,它负责处理 epoll 事件并设置 ScheduledIo 的 readiness;然后才轮到 Time Driver 处理定时器。两者的协作是通过 driver::Handle 来协调的。

1.1 再往下看:mio 封装了什么?

至此,IoStack 的堆叠关系已经清楚。但再往下追问一步——IoDriver (mio::Poll) 这个最底层又是什么?

答案是 mio::Poll 是跨平台 I/O 多路复用的统一接口,它在编译时根据目标操作系统选择后端实现3

// mio/src/sys/mod.rs: 42-56
#[cfg(any(unix, target_os = "hermit", ...))]
cfg_os_poll! {
    mod unix;
    pub use self::unix::*;   // unix → 再根据 OS 选 epoll / kqueue / poll
}

#[cfg(windows)]
cfg_os_poll! {
    mod windows;
    pub use self::windows::*; // Windows IOCP
}

在 Unix 下,mio 的选择器文件分别对应三个级别的 I/O 多路复用机制:

操作系统 mio 选择器 内核接口 核心文件
Linux epoll epoll_create / epoll_ctl / epoll_wait mio/src/sys/unix/selector/epoll.rs
macOS / iOS / FreeBSD kqueue kqueue / kevent mio/src/sys/unix/selector/kqueue.rs
其他 Unix 回退 poll poll mio/src/sys/unix/selector/poll.rs
Windows IOCP CreateIoCompletionPort / GetQueuedCompletionStatus mio/src/sys/windows/
WASI (p1) wasip1 poll_oneoff mio/src/sys/wasip1/

epoll.rs 中的 Selector 直接封装了 epoll 系统调用4

// mio/src/sys/unix/selector/epoll.rs: 32-66
impl Selector {
    pub fn new() -> io::Result<Selector> {
        // epoll_create1(EPOLL_CLOEXEC) 系统调用
        let ep = unsafe {
            OwnedFd::from_raw_fd(syscall!(epoll_create1(libc::EPOLL_CLOEXEC))?)
        };
        Ok(Selector { ep, .. })
    }

    pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
        // epoll_wait 系统调用
        syscall!(epoll_wait(
            self.ep.as_raw_fd(),
            events.as_mut_ptr(),
            events.capacity() as i32,
            timeout,
        ))
        .map(|n| unsafe { events.set_len(n as usize) })
    }
}

kqueue.rs 则是对另一套系统调用的封装5

// mio/src/sys/unix/selector/kqueue.rs: 96-139
impl Selector {
    pub fn new() -> io::Result<Selector> {
        // kqueue 系统调用
        let kq = unsafe { OwnedFd::from_raw_fd(syscall!(kqueue())?) };
        Ok(Selector { kq, .. })
    }

    pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
        // kevent 系统调用
        syscall!(kevent(
            self.kq.as_raw_fd(),
            ptr::null(),     // 无 changelist(只读不写)
            0,
            events.as_mut_ptr().cast(),
            events.capacity() as Count,
            timeout_ref,
        ))
        .map(|n| unsafe { events.set_len(n as usize) })
    }
}

mio 的 cfgCargo.toml 中通过 feature os-poll 触发条件编译6:Linux 上只编译 epoll.rs,macOS/FreeBSD 上只编译 kqueue.rs,Windows 上编译 IOCP。Tokio 的 I/O Driver 从不关心底层——它只调用 mio::Poll::poll,剩下的由 mio 按平台转发:

// tokio/src/runtime/io/driver.rs: 122-140 — Tokio 只看这一层
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
    match self.poll.poll(events, max_wait) {  // ← 不管下面是 epoll 还是 kqueue
        Ok(()) => {}
        Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
        Err(e) => panic!("..."),
    }
    for event in events.iter() {
        // 通过 token 反查 ScheduledIo,设置 readiness,唤醒任务
    }
}

这个抽象的意义在于:Tokio 的整个 I/O 驱动——ScheduledIoRegistrationPollEvented——全部是跨平台的。同一份代码编译后,在 Linux 上走 epoll,在 macOS 上走 kqueue,在 Windows 上走 IOCP。平台的差异被 mio 完全封装在 src/sys/unix/selector/ 下的三个文件中。

回到最初的问题:driver 层,是不是就是 tokio + mio 封装不同操作系统的 NIO 库细节的层面(比如 epoll 和 kqueue)?

是的,但可以更精确地分为两层:

完整层叠关系可以用一张图来总结:

graph TD
    A["TcpStream::read().await<br/>应用层"] --> B["Registration + PollEvented + ScheduledIo<br/>注册层"]
    B --> C["runtime::io::Driver (持有 mio::Poll)<br/>Tokio 驱动层"]
    C --> D["mio::Poll → Registry::register / poll<br/>mio 抽象层"]
    D --> E["cfg!(target_os) 选择 epoll/kqueue/poll/IOCP<br/>平台选择层(编译时)"]
    E --> F["epoll_wait / kevent / poll / GetQueuedCompletionStatus<br/>内核接口层"]
    F --> G["网卡中断 → 协议栈 → socket buffer → 就绪通知<br/>物理层"]

1.2 Driver 堆叠结构:Timer 和 I/O 如何共享同一次 epoll_wait

前面的层叠图描绘了完整的竖线链路。但横着看,Driver 实际运行时是一层套一层套一层套一层——Tokio 有五个 Driver structs 和两个 enum/alias 层,它们的定义位置和嵌套关系如下:

Driver 结构 定义文件:行 角色 持有
runtime::driver::Driver runtime/driver.rs:16 最外层,暴露给 Worker inner: TimeDriver
TimeDriver(enum) runtime/driver.rs:290 上方 Driver 的 inner,可选启用 Enabled { driver: time::Driver } / Disabled(IoStack)
runtime::time::Driver runtime/time/mod.rs:90 Timer Driver park: IoStack
IoStack(enum) runtime/driver.rs:138 park 栈桥 Enabled(ProcessDriver) / Disabled(ParkThread)
runtime::process::Driver runtime/process.rs:13 Process Driver(子进程清理) park: SignalDriver
runtime::signal::Driver runtime/signal/mod.rs:19 Signal Driver(Unix 信号) io: io::Driver
runtime::io::Driver runtime/io/driver.rs:25 I/O Driver,最底层 poll: mio::Poll, events: mio::Events

五个 Driver struct 的定义文件按创建顺序是:

runtime::driver::Driver::new()
  → create_io_stack()               → runtime::io::Driver          (runtime/io/driver.rs:25)
    → create_signal_driver(io_drv)   → runtime::signal::Driver      (runtime/signal/mod.rs:19)
      → create_process_driver(sig)   → runtime::process::Driver     (runtime/process.rs:13)
  → create_time_driver(io_stack)     → runtime::time::Driver        (runtime/time/mod.rs:90)
  → Driver { inner: TimeDriver }    → runtime::driver::Driver       (runtime/driver.rs:16)

Type alias 的条件编译退化:

alias 启用时 → 关闭时退化为
IoDriver runtime::io::Driverruntime/driver.rs:135 —(无 IO 时无 alias)
SignalDriver runtime::signal::Driverruntime/driver.rs:244 IoDriverruntime/driver.rs:258
ProcessDriver runtime::process::Driverruntime/driver.rs:269 SignalDriverruntime/driver.rs:278

一个 Runtime 有多少个 Driver 实例?

builder.rs 中创建 Driver 的入口只有一行:

// tokio/src/runtime/builder.rs: 1672 / 1860
let (driver, driver_handle) = driver::Driver::new(cfg)?;

无论 current_thread 还是 multi_thread 调度器,一个 Runtime 只调一次。所以默认全部启用时,每种 struct 恰好一个实例,按值嵌套struct 字段直接持有所属 struct,不是 Arc 也不是 Box):

runtime::driver::Driver   × 1   ← 最外层
runtime::time::Driver     × 1   ← Timer 层
runtime::process::Driver  × 1   ← 子进程清理层 (Linux)
runtime::signal::Driver   × 1   ← Unix 信号层 (Unix)
runtime::io::Driver       × 1   ← 最底层,持有 mio::Poll

嵌套链是用字段按值 struct A { b: B } 串起来的,不是指针。

创建过程:从内到外逐层包裹

driver::Driver::new(cfg) 内部按从内到外的顺序依次创建每个 struct,每个 ::new() 接收下一层的 struct 作为参数,按值取走所有权:

// tokio/src/runtime/driver.rs: 47-58
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
    // 第一步:从内到外包
    let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
    let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
    let (time_driver, time_handle) =
        create_time_driver(cfg.enable_time, cfg.timer_flavor, io_stack, &clock);

    Ok((
        Self { inner: time_driver },   // 最外层
        // ...
    ))
}

三层帮助函数展开如下。

create_io_stack()runtime/driver.rs:149-170):

fn create_io_stack(enabled: bool, nevents: usize) -> ... {
    let (io_driver, io_handle) = io::Driver::new(nevents)?;        // ⑤ io::Driver

    let (signal_driver, signal_handle) =
        create_signal_driver(io_driver, &io_handle)?;               // ④ signal::Driver 拿走了 io::Driver

    let process_driver = create_process_driver(signal_driver);      // ③ process::Driver 拿走了 signal::Driver

    (IoStack::Enabled(process_driver), ...)                         // IoStack 拿走了 process::Driver
}

create_signal_driver()runtime/driver.rs:247-253):

fn create_signal_driver(io_driver: IoDriver, io_handle: &io::Handle)
    -> io::Result<(SignalDriver, SignalHandle)>
{
    let driver = crate::runtime::signal::Driver::new(io_driver, io_handle)?;
    //                        io::Driver 被按值移入 ↑
    let handle = driver.handle();
    Ok((driver, Some(handle)))
}

create_process_driver()runtime/driver.rs:271-275):

fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
    ProcessDriver::new(signal_driver)
    //         SignalDriver 被按值移入 ↑
}

create_time_driver()runtime/driver.rs:305-323):

fn create_time_driver(enable, timer_flavor, io_stack: IoStack, clock) -> (TimeDriver, TimeHandle) {
    let (driver, handle) = time::Driver::new(io_stack, clock);
    //                       IoStack 被按值移入 ↑
    (TimeDriver::Enabled { driver }, Some(handle))
}

最终 Driver { inner: TimeDriver } 拿到最外层。所以这 5 个 struct 并不是”各自创建后组装”,而是一个接一个被吞进下一层:io::Driversignal::Driver 吞掉,signal::Driverprocess::Driver 吞掉,process::DriverIoStack 吞掉,IoStacktime::Driver 吞掉,time::DriverTimeDriver 吞掉,TimeDriverdriver::Driver 吞掉。最后只有 driver::Driver 这一个 struct 活着暴露给 Worker。

五个 struct 全部在堆栈上从一个 driver::Driver::new() 调用中构建完成。多线程下,整个嵌套体被放进 TryLock<Driver>,所有 worker 共享,只有执行 park_timeout 的线程才能排他地拿走 Driver。

包含关系与 park 调用链映射

上面的嵌套链是静态包含关系——编译时就确定的 struct 字段层级。而 park 调用链是运行时方法调用路径——从 Driver::park() 开始一直委派到 io::Driver::turn()。两者是平行映射的。

每层的委派模式不完全一致——time::Driver 在前面先做自己的事然后委派,process/signal 在后面先委派再处理自己的事:

静态包含关系                      运行时 park 调用链
─────────────────                ────────────────────
driver::Driver                    Driver::park(handle)
  └── inner: TimeDriver(enum)       → TimeDriver::park(handle)
        └── TimeDriver::Enabled        → time::Driver::park_internal(handle)
              └── park: IoStack(enum)      → IoStack::park_timeout(handle, dur)
                    └── IoStack::Enabled      → ProcessDriver::park_timeout(handle, dur)
                          └── process::Driver     → process::Driver::park_timeout(handle, dur)
                                └── park: SignalDriver  → SignalDriver::park_timeout(handle, dur)
                                      └── io: io::Driver      → io::Driver::park_timeout(handle, dur)
                                                                  → io::Driver::turn()
                                                                    → self.poll.poll() (epoll_wait)

每一层的 park/park_timeout 方法签名完全一致:fn(&mut self, handle: &driver::Handle) 或带 duration: Duration。但注意没有 trait——每个 impl 块是独立写的,只是碰巧签名相同。

但委派的时机不同——各层是前序、后序混合

time::Driver::park_internal:     前序 → 先查时间轮(自己事),再调 IoStack
process::Driver::park:           后序 → 先调 signal::Driver,再 reap_orphans
signal::Driver::park:            后序 → 先调 io::Driver,再 process() 处理信号
io::Driver::park:                终点 → 没有委派,直接 turn() → epoll_wait

time::Driver 需要先知道 timeout(查时间轮),然后才决定是否委派,所以是前序。process/signal 没有需要提前得到的信息,委派回来后顺便干点自己的活,所以是后序。

例外还有最外层 runtime::driver::Driver——它不做自己的事,self.inner.park(handle) 纯委派。

这就是整个 Driver 堆叠的静态结构和动态路径的完整映射。

每层的职责

从外到内,每层在 park 循环中做的事情:

层级 在 park 调用链中做的事 在返回路径中做的事
driver::Driver 纯壳子:直接委派给 TimeDriver,Worker 只认这一个 struct
TimeDriver (enum) 开关层:判断 timer 是否启用。启用则委派给 time::Driver,关闭则委派给 IoStack
time::Driver 查时间轮next_expiration_time() 算最早 deadline。到期则 park_timeout(0s),未到期则设 timeout 收割到期 timerprocess_at_time()fire()wake()
IoStack (enum) I/O 开关:启用则走 epoll 栈(process::Driver),关闭则退化为 ParkThread::parkCondvar::wait
process::Driver 委派给 SignalDriver 清理用户子进程GlobalOrphanQueue::reap_orphans()——回收用户通过 tokio::process::Command 启动的子进程退出后的僵尸进程表项。runtime 本身不启动任何额外进程
signal::Driver 委派给 io::Driver 处理信号:读 self-pipe,globals().broadcast()
io::Driver 唯一真正干活turn()self.poll.poll(events, max_wait) 执行 epoll_wait / kevent 分发 I/O 事件:遍历 events,设 ScheduledIo readiness,wake() 任务

调用委派方向是从外到内,每层先委派完再处理自己的逻辑:

park 方向:   外 → TimeDriver → time::Driver → IoStack → process → signal → io::Driver::turn()
                                                                              ↓
                                                                        epoll_wait
返回方向:   外 ← TimeDriver ← time::Driver ← IoStack ← process ← signal ← io::Driver::turn()
                                      ↓                  ↓         ↓
                                process_at_time     reap      process
                                收割到期 timer      orphans   信号

epoll_wait 返回后,事件处理的顺序是固定的:I/O 事件最先 → 信号处理 → 子进程清理 → 到期 timer 最后。这不是显式配置的优先级,而是嵌套结构带来的自然结果——最内层(io::Driver) 最先被调用也最先返回处理,最外层(time::Driver)最后处理。嵌套越深,优先级越高。

这个模型可以类比为线性链的后序遍历(post-order):子节点处理完 → 父节点处理。Driver 嵌套链是一个线性调用树(每个节点只有一个子节点),park 方向是前序(先查时间轮,再进子节点),返回方向是后序(先分发 I/O 事件,再处理信号,最后收割 timer)。

条件编译可以缩短链:无 process feature 时 process::Driver 退化为 SignalDriver,无 signal 时 SignalDriver 退化为 IoDriver。但剩余层的顺序和职责不变。

但链不是无限可缩的——最外层和最内层是固定的:

runtime::driver::Driver    ← 必须存在,Worker 只看这一个入口
    └── TimeDriver          ← 必须存在,enum 天生存在
        ├── time::Driver    ← 可关(关 time 时 → IoStack)
        └── IoStack         ← 必须存在,enum 天生存在
            ├── process     ← 可关(退化 → SignalDriver)
            ├── signal      ← 可关(退化 → io::Driver)
            └── io::Driver  ← 可关(关 io 时 → ParkThread)
                └── ParkThread  ← 最终兜底(Condvar::wait)

所以真正永远无法跳过的是三层 struct/enum:

层级 为什么不可缩 最简形态
runtime::driver::Driver Worker 线程调它的 park(handle) Driver { inner: TimeDriver }
TimeDriver enum 天生存在;time 关闭时退化为 IoStack 别名 TimeDriver = IoStack
IoStack enum 天生存在;io 关闭时退化为 ParkThread IoStack::Disabled(ParkThread)

链必须从 runtime::driver::Driver 走到某个能阻塞线程的兜底机制。io::Driver(带 epoll_wait)是最常见的兜底,但不是唯一的——ParkThreadCondvar::wait)才是永远存在的最后防线。关掉所有可选层后,runtime 退化为一个可以用 tokio::spawn 做纯计算任务调度器,没有 I/O、没有 Timer、没有 Process、没有 Signal。

全部默认启用时的完整堆叠可以用下面的关系图表示。左侧是 Driver 嵌套(实线 --> = 按值包含),右侧是 Handle 共享(虚线 o-- = Arc 聚合):

graph LR
    subgraph "Driver 嵌套(按值包含)"
        D["runtime::driver::Driver"] -->|"inner"| TD["TimeDriver (enum)"]
        TD -->|"Enabled.driver"| T["runtime::time::Driver"]
        T -->|"park"| IS["IoStack (enum)"]
        IS -->|"Enabled"| PD["ProcessDriver =<br/>process::Driver"]
        PD -->|"park"| SD["signal::Driver"]
        SD -->|"io"| ID["runtime::io::Driver<br/>{ poll: mio::Poll }"]
    end

    subgraph "Handle 聚合(Arc 共享)"
        H["driver::Handle"] -->|"io"| IOH["IoHandle (enum)"]
        IOH -.->|"Enabled (Arc)"| IOH2["io::Handle<br/>{ registry, waker }"]
    end

io::Handle 通过 Arc 聚合共享外,time::Handle 也是 Option<time::Handle>Arc<InnerState>),所有 worker 线程都通过 Arc<driver::Handle> 共享同一组 Handle。

关键认识(两个层次——外部统一入口 vs 内部各搞各的):

外部统一入口:Worker 线程只看 runtime::driver::Driver 这一个 struct,永远只调它的 park/park_timeout,不关心内部嵌套。inner: TimeDriver 根据 feature 配置退化为 IoStackParkThread,暴露给 Worker 的 API 不变。

内部各搞各的:链上的五个 Driver struct(driver/time/process/signal/io没有共享的 trait。每个 impl Driver { park() } 是孤立定义的,只是碰巧有相同的签名。调用链靠硬编码的字段委派维系,编译器不检查签名一致性,也没有 Box<dyn Park> 的动态替换能力。

具体 struct 关系:

park 调用链:Driver::parkTimeDriver::parktime::Driver::park_internalIoStack::park_timeoutProcessDriver::park...io::Driver::turnmio::Poll::poll

这条结构链决定了两种使用场景的走法完全不同:

场景 A:只有 I/O,没有 timer(TcpStream 纯读)

Worker 线程主循环
  → park_internal()
    → 查 Wheel → next_wake = None(时间轮是空的)
    → IoStack::park_timeout(None)   ← 无 timer,传 None
      → I/O Driver::turn()
        → epoll_wait(events, -1)    ← 无限期等 I/O
          → [数据到达,网卡中断]
          → epoll_wait 返回
          → 遍历 events → ScheduledIo::set_readiness()
          → waker.wake()            ← 唤醒 TcpStream 的任务
    → process_at_time(now)          ← 时间轮为空,no-op
    → 返回到调度器,执行就绪任务

场景 B:只有 timer,没有 I/O(纯 sleep)

Worker 线程主循环
  → park_internal()
    → 查 Wheel → next_wake = Some(10000)
    → duration = 10000 - now = 5000ms
    → IoStack::park_timeout(5000ms)
      → I/O Driver::turn()
        → epoll_wait(events, 5000)  ← 带 5s 超时
          → [超时,hrtimer 到期]
          → epoll_wait 返回(无 I/O 事件)
    → process_at_time(now=10000)    ← 收割到期 timer
    → fire() → waker.wake()         ← 唤醒 Sleep 的任务

场景 C:既有 I/O 又有 timer(常见混合场景)

Worker 线程主循环
  → park_internal()
    → 查 Wheel → next_wake = Some(10000)
    → IoStack::park_timeout(5000ms)
      → I/O Driver::turn()
        → epoll_wait(events, 5000)
          → [2s 时,TcpStream 数据到达]
          → epoll_wait 提前返回      ← I/O 事件先到
          → 遍历 events → ScheduledIo::set_readiness()
          → waker.wake()             ← 唤醒 TcpStream 的任务
    → process_at_time(now=7000)      ← elapsed 只走到 7000,不是 10000
    → 没有 timer 到期(deadline 10000 > 7000)
    → 返回调度器,继续处理

精妙之处在场景 C:epoll_wait 的 OR 语义让两个驱动共享一次系统调用——TcpStream 数据先到就提前回来,Timer 顺便看一眼谁到期了;hrtimer 先到就准时处理 timer。不需要两套独立的等待线程或两套 epoll 实例。

sequenceDiagram
    participant Worker as Worker 线程
    participant TD as Time Driver
    participant IOD as I/O Driver
    participant EP as epoll_wait

    note over Worker: 场景 C:5s timer + 1 个 TcpStream
    Worker->>TD: park_internal()
    TD->>TD: next_wake = 10000, duration = 5000
    TD->>IOD: park_timeout(5000ms)
    IOD->>EP: epoll_wait(events, 5000)
    note over EP: OR 等待:I/O 事件 or 5000ms 超时
    EP-->>IOD: [2s] TcpStream 可读
    IOD->>IOD: 处理 events → wake TcpStream
    IOD-->>TD: turn() 返回
    TD->>TD: process_at_time(now=7000)
    note over TD: deadline 10000 > 7000 → 无 timer 到期
    TD-->>Worker: 回到调度器

这就是 Tokio 的 Driver { inner: TimeDriver } 设计的核心:外层 Timer 决定等多久,内层 I/O 执行等待,醒来后各管各的数据。I/O Driver 不在 inner 字段里,它在 TimeDriver 的 park 字段里层层嵌套地藏着。

1.3 底层底座:所有 Driver 共享同一个 runtime::io::Driver

堆叠链中的每一层都有自己的 impl Driver { park() },但它们最终都有一个共同的落脚点——runtime::io::Driver(宿主 mio::Poll。不管是 Timer、Signal 还是 Process,park 链的归宿都一样:

runtime::time::Driver::park_internal
  → IoStack::park_timeout
    → ProcessDriver::park_timeout
      → SignalDriver::park_timeout
        → io::Driver::park_timeout      ← runtime::io::Driver
          → self.poll.poll(events, timeout)

runtime::io::Driver 的 struct 定义直接暴露了这一点7

// tokio/src/runtime/io/driver.rs: 25-37
pub(crate) struct Driver {
    /// True when an event with the signal token is received
    signal_ready: bool,

    /// Reuse the `mio::Events` value across calls to poll.
    events: mio::Events,

    /// The system event queue.
    poll: mio::Poll,
}

它的 park_timeoutturn() 是实际执行 epoll_wait 的地方7

// tokio/src/runtime/io/driver.rs: 138-154
pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
    let handle = rt_handle.io();
    self.turn(handle, Some(duration));
}

fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
    handle.release_pending_registrations();
    let events = &mut self.events;

    // 阻塞等待:这行是真正的 epoll_wait
    match self.poll.poll(events, max_wait) {   // ← mio::Poll::poll → epoll_wait
        Ok(()) => {}
        Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
        Err(e) => panic!("{e:?}"),
    }

    // 分发事件给 ScheduledIo
    for event in events.iter() {
        let ready = Ready::from_mio(event);
        let ptr = super::EXPOSE_IO.from_exposed_addr(event.token().0);
        let io: &ScheduledIo = unsafe { &*ptr };
        io.set_readiness(Tick::Set, |curr| curr | ready);
        io.wake(ready);
    }
}

这些代码是 park 链最内层的一次调用。但多线程下,所有 worker 共享同一个 runtime::io::Driver——通过 TryLock<Driver> 做排他访问8

// tokio/src/runtime/scheduler/multi_thread/park.rs: 48-65
struct Shared {
    /// Shared driver. Only one thread at a time can use this
    driver: TryLock<Driver>,
}

// park_timeout 时尝试拿锁:
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) -> HadDriver {
    if let Some(mut driver) = self.inner.shared.driver.try_lock() {
        // 拿到锁 → 走 Driver::park_timeout → epoll_wait
        self.inner.park_driver(&mut driver, handle, Some(duration))
    } else if !duration.is_zero() {
        // 没拿到锁 → 退化为 condvar 等待
        self.inner.park_condvar(Some(duration));
        HadDriver::No
    }
}

这就是为什么一个 Runtime 五个 Driver struct 但只一个 io::Driver(一个 mio::Poll / 一个 epoll fd):epoll fd 不能被多个线程同时 epoll_wait,所以多线程下通过 TryLock 串行化访问。拿不到锁的线程退化为 condvar 等待,等拿到锁的线程执行完 epoll_wait 后唤醒它。

TcpStream 不走 park 链。它不是通过 park() 去用 mio::Poll 的,而是直接通过 Handle::add_source() 把 fd 注册到同一个 mio::Poll 实例上:7

// tokio/src/runtime/io/driver.rs: 182-207 (Handle::add_source)
pub(super) fn add_source(&self, source: &mut impl mio::event::Source, interest: Interest)
    -> io::Result<Arc<ScheduledIo>>
{
    let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
    let token = scheduled_io.token();
    self.registry.register(source, token, interest.to_mio())?;
    //  ↑ 通过 mio::Registry::register → epoll_ctl(EPOLL_CTL_ADD)
    Ok(scheduled_io)
}

而 park 链是通过 self.poll.poll(events, timeout)(即 epoll_wait)来消费事件。注册(epoll_ctl)和等待(epoll_wait)都在同一个 mio::Poll 上完成:

                   同一个 mio::Poll 实例
                   ┌──────────────────────┐
                   │                      │
  epoll_ctl ADD ◄──┤  runtime::io::Driver  ├──► epoll_wait
  (TcpStream 注册)  │     { poll }         │  (park 链等待)
                   │                      │
                   └──────────────────────┘
                              ▲
                 ┌────────────┼────────────┐
                 │            │            │
            Timer park   Signal park   Process park
            epoll_wait   epoll_wait    epoll_wait
            (timeout=5s) (timeout=-1)  (timeout=-1)

换个角度看,五个 Driver struct 的关系是:

Driver 作用 是否调用 epoll_wait 是否注册 fd
driver::Driver 最外层调度入口 否,委派给 TimeDriver
time::Driver 查时间轮 + 设 timeout 否,委派给 IoStack
process::Driver 子进程清理 否,委派给 SignalDriver
signal::Driver 信号处理 否,委派给 io::Driver 是,注册一个 UnixStream
io::Driver 真正的 epoll 封装 是,所有 fd

runtime::io::Driver 是最底层——它是唯一真正执行 epoll 系统调用、唯一管理 epoll 注册表的那一层。上面四层 Driver 全部通过 io::Driver 间接完成等待,没有自己的 epoll 实例。

这也就意味着:即使一个 Worker 线程没有 Timer(time feature 关闭)、没有 Signal、没有 Process,它依然需要一个 runtime::io::Driver 来运行 mio::Poll::poll()——否则连基本的 TcpStream 等待都没法做。 io::Driver 是唯一的硬性依赖,其他 Driver 都是可选的 wrapper。

1.4 Driver 分层设计要点总结

将前文的分析浓缩为以下七条:

  1. Tokio 内部共有五层 Driversruntime::driver::Driverruntime::time::Driverruntime::process::Driverruntime::signal::Driverruntime::io::Driver,以及两个辅助 enum TimeDriverIoStack

  2. 没有共享的 trait:这些 Drivers 虽然都提供 park/park_timeout/shutdown 方法,接口签名完全一致,但不是通过 impl SomeTrait for Driver 实现的——每个 impl 块是独立定义的,只有人工约定的签名一致性。

  3. 五层构成一条线性调用链:最外层 runtime::driver::DriverTimeDrivertime::DriverIoStackprocess::Driversignal::Driverio::Driver(最底层)。链的长度可通过条件编译缩短,但顺序不变。

  4. 每一层调用下一层的 park 方法:通过 self.next.park(handle)self.next.park_timeout(handle, duration) 委派,最终到达 io::Driver::turn()self.poll.poll()epoll_wait / kevent)。

  5. 前序调用与后序调用并存time::Driver 是先查时间轮再委派(前序),process::Driversignal::Driver 是先委派再处理自己的逻辑(后序)。

  6. 条件编译可缩短链:通过 Cargo features(nettimeprocesssignal)和 Builder 选项(.enable_io().enable_time())控制每层的启用。关掉所有可选层后,最少剩 DriverTimeDriverIoStack::Disabled(ParkThread)

  7. 每层全局唯一实例:一个 Runtime 恰好一套 Driver 实例,按值嵌套在 runtime::driver::Driver 这一个 struct 中(不是全局 static 变量,是 Runtime 的字段)。多线程下所有 worker 共享同一个 TryLock<Driver>

二、TcpStream 的创建:一条完整的注册链

现在来看 TcpStream::connect("127.0.0.1:8080") 背后到底发生了什么。

2.1 从 TcpStream 到 PollEvented

TcpStream 的结构非常简单——它只是一个 PollEvented 的包装9

// tokio/src/net/tcp/stream.rs: 72-74
pub struct TcpStream {
    io: PollEvented<mio::net::TcpStream>,
}

PollEvented 是 Tokio 的一个通用包装器:它接收一个实现了 mio::event::Source 的类型(如 mio::net::TcpStream),将其注册到运行时的 I/O 驱动上10

// tokio/src/io/poll_evented.rs: 89-94
pub(crate) struct PollEvented<E: Source> {
    io: Option<E>,
    registration: Registration,
}

创建 PollEvented 时,需要拿到当前 runtime 的调度器句柄(scheduler::Handle),然后用它来完成注册10

// tokio/src/io/poll_evented.rs: 130-139
pub(crate) fn new_with_interest_and_handle(
    mut io: E,
    interest: Interest,
    handle: scheduler::Handle,
) -> io::Result<Self> {
    let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
    Ok(Self { io: Some(io), registration })
}

TcpStream::new 只做了一件事:把 mio::net::TcpStream 交给 PollEvented::new9

// tokio/src/net/tcp/stream.rs: 160-163
pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
    let io = PollEvented::new(connected)?;
    Ok(TcpStream { io })
}

2.2 从 PollEvented 到 Registration

Registration 是更底层的抽象。它持有一个 Arc<ScheduledIo>,后者是 I/O 驱动中真正管理状态的结构11

// tokio/src/runtime/io/registration.rs: 64-72
pub(crate) struct Registration {
    handle: scheduler::Handle,
    shared: Arc<ScheduledIo>,
}

Registration::new_with_interest_and_handle 是整条注册链的关键入口11

// tokio/src/runtime/io/registration.rs: 91-98
pub(crate) fn new_with_interest_and_handle(
    io: &mut impl Source,
    interest: Interest,
    handle: scheduler::Handle,
) -> io::Result<Registration> {
    let shared = handle.driver().io().add_source(io, interest)?;
    Ok(Registration { handle, shared })
}

这里 handle.driver().io() 拿到 I/O Driver 的 Handle,然后调用 add_source

2.3 从 Registration 到 I/O Driver

Handle::add_source 是真正把 fd 注册到 epoll 的地方12

// tokio/src/runtime/io/driver.rs: 182-207
pub(super) fn add_source(
    &self,
    source: &mut impl mio::event::Source,
    interest: Interest,
) -> io::Result<Arc<ScheduledIo>> {
    // 1. 从 RegistrationSet 中分配一个新的 ScheduledIo
    let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
    let token = scheduled_io.token();

    // 2. 通过 mio::Registry 将 source 注册到 epoll
    //    token 是 ScheduledIo 对象的指针值(作为 epoll 的标识)
    if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
        // 注册失败:从 set 中移除分配的 ScheduledIo
        unsafe { self.registrations.remove(&mut self.synced.lock(), &scheduled_io) };
        return Err(e);
    }

    self.metrics.incr_fd_count();
    Ok(scheduled_io)
}

这里有一个关键设计:token 是 ScheduledIo 对象的地址mio::Token 是一个 usize,而 ScheduledIo 是一个 Arc 管理的堆对象。add_source 使用 PtrExposeDomainScheduledIo 的地址暴露为 mio::Token,这样 epoll 返回事件时,Tokio 可以直接通过 token 反向找到对应的 ScheduledIo12

// tokio/src/runtime/io/scheduled_io.rs: 268-270
pub(crate) fn token(&self) -> mio::Token {
    mio::Token(super::EXPOSE_IO.expose_provenance(self))
}

整条注册链可以用下图来总结:

graph TD
    A["TcpStream::connect(addr)"] --> B["mio::net::TcpStream::connect(addr)"]
    B --> C["TcpStream::new(sys)"]
    C --> D["PollEvented::new(connected)"]
    D --> E["Registration::new_with_interest_and_handle"]
    E --> F["Handle::add_source"]
    F --> G["RegistrationSet::allocate → Arc<ScheduledIo>"]
    F --> H["mio::Registry::register(source, token, interest)"]
    H --> I["epoll_ctl(EPOLL_CTL_ADD, fd, ...)"]

    style G fill:#e1f5fe
    style H fill:#e1f5fe
    style I fill:#c8e6c9

注册完成后,TcpStreamPollEvented 内部持有一个 RegistrationRegistration 内部持有一个 Arc<ScheduledIo>,而 ScheduledIo 的地址就是它在 epoll 中的 token。通过这个 token,epoll 事件可以反向定位到 ScheduledIo,运行时可快速处理就绪事件。

三、ScheduledIo:边沿触发的原子状态机

ScheduledIo 是 I/O 驱动的核心数据结构。如果说 Timer 驱动的核心是哈希时间轮和 TimerShared,那 I/O 驱动的核心就是 ScheduledIo 和它的原子状态13

// tokio/src/runtime/io/scheduled_io.rs: 59-61
pub(crate) struct ScheduledIo {
    linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
    readiness: AtomicUsize,     // 打包的 readiness 状态
    waiters: Mutex<Waiters>,    // 等待队列
}

3.1 打包的原子状态

readiness 字段是一个 AtomicUsize,但它承载了三个独立的信息,通过位打包(bit packing)放在一个原子变量里13:

// tokio/src/runtime/io/scheduled_io.rs: 224-230
// | shutdown | driver tick | readiness |
// |----------+-------------+-----------|
// |   1 bit  |   15 bits   |  16 bits  |

const READINESS: bit::Pack = bit::Pack::least_significant(16);
const TICK: bit::Pack = READINESS.then(15);
const SHUTDOWN: bit::Pack = TICK.then(1);

三个状态打包在一个 AtomicUsize 中,意味着对 readiness 的更新和检查都是原子操作,无需外部锁

3.2 和 TimerShared 的对比

上篇文章介绍了 TimerShared 及其 StateCell(用 AtomicU64 实现的原子状态机)。两者都是原子状态机,但解决的问题不同:

方面 TimerShared (sleep) ScheduledIo (I/O)
核心状态 到期时间、是否已触发 readiness、tick
驱动方式 时间轮到期触发 epoll 事件通知
更新方式 CAS 循环(set_expiration/mark_pending fetch_updateset_readiness
通知机制 fire()Waker::wake() wake(ready)Waker::wake()
并发角色 用户端(TimerEntry)+ 驱动端(Driver 用户端(Registration)+ 驱动端(Driver
数据结构 侵入式链表节点(用于时间轮 slot) 侵入式链表节点(用于 RegistrationSet)

3.3 边沿触发(ET)模型:为什么需要 tick

mio 在 Linux 上默认使用 epoll 的边沿触发(Edge-Triggered, ET)模式。ET 模式的特点是:只在状态从「不可用」变为「可用」时通知一次。这意味着:

  1. epoll 通知 readable → ScheduledIo 设置 READABLE 位
  2. 用户尝试 read() → 成功读到部分数据
  3. epoll 不会再次通知(因为状态没有从不可读变为可读)
  4. 用户可以继续 read() 直到返回 WouldBlock → 清除 READABLE 位 → 下次 epoll 通知再来

但这里有一个微妙的问题:如何防止清理旧事件时误清新事件? 这就是 tick 的作用。

sequenceDiagram
    participant EPoll as epoll
    participant SI as ScheduledIo
    participant Task as User Task

    EPoll->>SI: poll 返回 readable(tick=1)
    SI->>SI: set_readiness: 设置 READABLE + tick=2
    SI-->>Task: wake()
    Task->>SI: poll_read_ready()
    SI-->>Task: READABLE(tick=2)
    Task->>Task: read() → 成功
    Note over Task: 还没读完...
    EPoll->>SI: 新数据到达,再次通知(tick=3)
    SI->>SI: set_readiness: tick=3
    Task->>SI: clear_readiness(tick=2)  ← 旧 tick
    SI->>SI: tick 不匹配 → 无操作
    Task->>SI: poll_read_ready()
    SI-->>Task: READABLE(tick=3 时设置)
    Task->>Task: 继续读取

关键机制在 set_readiness13

// tokio/src/runtime/io/scheduled_io.rs: 277-298
pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) {
    let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| {
        const MAX_TICK: usize = TICK.max_value() + 1;
        let tick = TICK.unpack(curr);

        let new_tick = match tick_op {
            // 清理 readiness 时:如果 tick 不匹配,跳过此次操作
            Tick::Clear(t) if tick as u8 != t => return None,
            Tick::Clear(t) => t as usize,
            // 设置 readiness 时:递增 tick
            Tick::Set => tick.wrapping_add(1) % MAX_TICK,
        };
        let ready = Ready::from_usize(READINESS.unpack(curr));
        Some(TICK.pack(new_tick, f(ready).as_usize()))
    });
}

fetch_update 是一个乐观 CAS 循环:它尝试读取当前值、调用闭包生成新值、然后原子地替换。如果闭包返回 None(例如清理时的 tick 不匹配),则不进行任何更新。

这正是 ET 模型所需要的:每次 epoll 通知都会递增 tick,旧的事件清不到新 tick 的脏位。如果没有这个 tick 机制,一个慢任务可能在清理旧事件时无意中清掉了新到达的事件,导致数据静默丢失。

3.4 Waiters:等待队列

ScheduledIo 的第三个字段 waiters 管理了等待此 I/O 资源的任务13

// tokio/src/runtime/io/scheduled_io.rs: 70-78
#[derive(Debug, Default)]
struct Waiters {
    /// 所有等待者的侵入式链表
    list: WaitList,

    /// AsyncRead 专用 waker 槽
    reader: Option<Waker>,

    /// AsyncWrite 专用 waker 槽
    writer: Option<Waker>,
}

这里有两个并行的机制:

  1. 专用 waker 槽reader / writer):用于 poll_read_ready / poll_write_ready 快速路径——每个方向只有一个任务在等待,直接用 Option<Waker> 存,无需链表
  2. 通用等待链表:用于 readiness() async 方法——支持多个任务同时等待同一个 ScheduledIo 的不同 Interest

当 I/O 事件到达时,wake() 方法遍历这两者并唤醒对应的任务13

// tokio/src/runtime/io/scheduled_io.rs: 304-348
pub(super) fn wake(&self, ready: Ready) {
    let mut wakers = WakeList::new();
    let mut waiters = self.waiters.lock();

    // 检查 read 槽
    if ready.is_readable() {
        if let Some(waker) = waiters.reader.take() {
            wakers.push(waker);
        }
    }
    // 检查 write 槽
    if ready.is_writable() {
        if let Some(waker) = waiters.writer.take() {
            wakers.push(waker);
        }
    }

    // 遍历链表中的等待者
    'outer: loop {
        let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
        while wakers.can_push() {
            match iter.next() {
                Some(waiter) => {
                    // ... 收集 waker,标记已就绪
                }
                None => break 'outer,
            }
        }
        // 批次满了:释放锁后批量唤醒
        drop(waiters);
        wakers.wake_all();
        waiters = self.waiters.lock();
    }
    // ...
}

wake_all 在释放锁之后进行,避免持锁时调用外部代码导致死锁——这和上篇文章中 Time Driver 的 waker_list.wake_all() 是相同的模式。

四、事件循环:turn()

当一切就绪后,是谁来驱动 I/O 事件的处理的?答案是 I/O Driver 的 turn() 方法12

// tokio/src/runtime/io/driver.rs: 122-174
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
    // 处理待释放的注册(被 drop 的 Registration)
    handle.release_pending_registrations();

    let events = &mut self.events;

    // 核心:阻塞在 epoll_wait 上
    match self.poll.poll(events, max_wait) {
        Ok(()) => {}
        Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
        // ...
    }

    // 处理所有返回的事件
    let mut ready_count = 0;
    for event in events.iter() {
        let token = event.token();

        if token == TOKEN_WAKEUP {
            // mio::Waker 的 unpark,无需处理
        } else if token == TOKEN_SIGNAL {
            self.signal_ready = true;
        } else {
            let ready = Ready::from_mio(event);
            // token 就是 ScheduledIo 的地址
            let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
            let io: &ScheduledIo = unsafe { &*ptr };

            // 更新 readiness 状态并唤醒等待任务
            io.set_readiness(Tick::Set, |curr| curr | ready);
            io.wake(ready);

            ready_count += 1;
        }
    }
    // ...
}

turn() 的工作流程清晰明了:

  1. 释放待处理的注册:清理之前被 drop 但尚未从 epoll 中移除的 Registration
  2. 阻塞等待self.poll.poll(events, max_wait) —— 对,这就是 epoll_wait
  3. 分发事件:遍历 epoll 返回的每个事件,通过 token 找到对应的 ScheduledIo,设置 readiness 位,唤醒等待的任务

当 I/O 驱动独立运行时(如 Time Driver 没有启用),max_waitNone 表示无限期等待;当 Time Driver 有定时器时,max_wait 为”到最早 deadline 的时间差”。

4.1 epoll 不保存数据

一个容易产生的误解是认为 epoll 返回事件时”带着数据一起回来了”。实际上,epoll 只通知”fd 可读了”,它不保存也不传递任何用户数据。数据从到达网卡到被用户程序读取,经历了完整的链路:

客户端发送数据包
    → 网卡硬件中断
        → 内核网络协议栈(IP/TCP 层)处理
            → 数据放入 socket 的接收缓冲区(内核内存)
                → epoll 检测到 fd 状态从不可读变为可读
                    → epoll_wait 返回「fd X 可读」
                        → Worker 线程醒来(turn() 返回)
                            → 设置 ScheduledIo readiness 位
                                → Waker::wake()
                                    → 任务重新入队,被调度执行
                                        → 任务调用 read(fd, buf)
                                            → read 系统调用把数据从内核 socket buffer 拷贝到用户缓冲区

epoll 的角色是门铃——它通知你”有东西到了,快来拿”,但它不负责把东西送来。真正的数据一直安静地躺在内核的 socket 接收缓冲区里,等用户任务被调度后通过 read() 系统调用拷贝到用户态。

这也是为什么 PollEvented::poll_read 的循环会在 read() 返回 WouldBlock 时清除 readiness 然后重试——因为如果假阳性发生了(数据被别的线程读走了或者已被处理),再次 read() 会告诉你 socket buffer 是空的,你需要清除 readiness 让 epoll 下次有新数据到达时重新通知你。

sequenceDiagram
    participant NIC as 网卡硬件
    participant Kernel as 内核协议栈
    participant Buf as Socket Buffer
    participant EPoll as epoll
    participant Task as 用户任务

    NIC->>Kernel: 数据包到达(硬件中断)
    Kernel->>Buf: TCP 重组 → 数据放入 socket buffer
    Buf->>EPoll: fd 状态变更(不可读 → 可读)
    EPoll->>EPoll: epoll_wait 返回
    Note over EPoll: epoll 只告诉你"fd X 可读"<br/>但不携带用户数据
    EPoll-->>Task: Waker::wake()
    Task->>Task: 任务被调度
    Task->>Buf: read(fd, buf)
    Buf-->>Task: 数据拷贝到用户缓冲区

对比一下 sleep 的”门铃”:

Timer Driver: epoll_wait(timeout) → 超时返回
    → 时间轮的定时器"只告诉你时间到了"
    → 没有数据要拷贝,直接 fire() → waker.wake()

I/O Driver:  epoll_wait(events, timeout) → 事件返回
    → epoll 的 fd 事件"只告诉你 fd 可读了"
    → 数据还在内核 socket buffer 里
    → 设置 readiness → waker.wake()
    → 任务醒来后需要 read() 把数据拷贝到用户态

在这一点上,两种驱动的”通知”机制在语义上是类似的——都只是信号通知,而不是数据传输。区别仅在于:时间通知直接触发任务的唤醒(时间到了就够了),而 I/O 通知后任务还需要多一步 read() 系统调用来搬运数据。

整体驱动循环

将 I/O 驱动和时间驱动放在一起看,Tokio 的 worker 线程主循环大致是:

sequenceDiagram
    participant Worker as Worker Thread
    participant Time as Time Driver
    participant IO as I/O Driver
    participant Sched as Scheduler

    loop 主循环
        Worker->>Time: park_internal()
        Time->>Time: 检查时间轮 → 下一个 deadline
        Time->>IO: park/park_timeout(duration)
        IO->>IO: epoll_wait(events, duration)
        Note over IO: 被 I/O 事件或超时唤醒
        IO-->>Time: 返回
        Time->>Time: process_at_time(now)
        Time->>Time: 收割到期 timer
        Time-->>Worker: 线程继续
        Worker->>Sched: 运行就绪任务
    end

这也就是在上一篇文章中看到的 park_internal 流程——只是现在我们把 park 的底层展开了,看到了 I/O 驱动这一层。

4.2 epoll 是 blocking 的,non-blocking 在哪?

到这里你可能有一个疑惑:既然 turn() 里的 epoll_wait 会阻塞线程,那 non-blocking I/O 到底体现在哪里?

答案是:阻塞在线程层次,非阻塞在任务层次。这是多路复用(multiplexing)的精髓。

对比两种模式就一目了然:

同步阻塞模式:一条线程 = 一个连接
  Thread A ── read(socket1) ── 阻塞等数据 ── 继续
  Thread B ── read(socket2) ── 阻塞等数据 ── 继续
  → N 个连接需要 N 条线程,OS 调度开销大

epoll 多路复用:一条线程 = 所有连接
  Worker ── epoll_wait([socket1, socket2, ...]) ── 阻塞等任意 fd 就绪 ──
      ├─ socket1 可读 → read(socket1) → 处理 → 回到 epoll_wait
      └─ socket2 可读 → read(socket2) → 处理 → 回到 epoll_wait
  → N 个连接只需要少量线程,zero-cost 任务切换

epoll_wait 的阻塞正是 tokio 需要的——它不是在阻塞等一个特定的 socket,而是在阻塞等所有注册过的 fd 中任何一个就绪。一条线程的通话时间被分给成千上万个 fd 共享,没有空转、没有忙等。

但这里还有第二层:单个 socket 的 read() 本身也是非阻塞的。tokio 的 mio::net::TcpStream 在注册之前就已经被设为 O_NONBLOCK 了。所以 read() 永远不会真正卡住线程:

read(nonblocking_fd, buf)  →  要么返回数据
                            →  要么返回 EAGAIN / EWOULDBLOCK
                            →  绝不阻塞等待

如果 read() 返回 WouldBlock,tokio 不会傻等——任务返回 Poll::Pending,线程立刻还给 epoll_wait 去服务其他 fd。等到下一次内核通知 “fd 可读” 时,epoll 会再次唤醒线程,再尝试读取。

sequenceDiagram
    participant Task1 as 任务 A (fd1)
    participant Task2 as 任务 B (fd2)
    participant EPoll as epoll
    participant Worker as Worker 线程

    Worker->>EPoll: epoll_wait([fd1, fd2])
    Note over Worker: 线程阻塞等待
    EPoll-->>Worker: fd1 可读
    Worker->>Task1: poll fd1
    Task1->>Task1: read(fd1, buf) → 成功
    Task1-->>Worker: Ready
    Worker->>EPoll: 再次 epoll_wait
    Note over Worker: 线程阻塞等待
    EPoll-->>Worker: fd2 可读
    Worker->>Task2: poll fd2
    Task2->>Task2: read(fd2, buf) → WouldBlock! (假阳性)
    Task2->>Task1: clear_readiness + Pending
    Task2-->>Worker: Pending
    Worker->>EPoll: 再次 epoll_wait

注意到上面 seq 图中 WouldBlock 场景:任务 B 被唤醒后 read() 返回 WouldBlock,它没有阻塞线程,而是返回 Pending 让线程继续回到 epoll_wait

常问追问:epoll_wait 阻塞线程,那 .await 会 blocking 吗?

把这个问题拆开看,一个 .await 的完整生命周期是:

  1. Future::poll → 数据没到 → 注册 Waker → 返回 Poll::Pending
  2. 调度器把这个任务从运行队列移到等待队列
  3. 调度器拿出下一个就绪任务运行
  4. 如果没有任何任务就绪,线程才进入 epoll_wait

所以答案很清楚:.await 本身从不导致线程阻塞.await 返回 Poll::Pending 之后就回到了调度器,线程继续处理其他就绪任务。只有所有任务都挂起后,线程才去 epoll_wait 等事件——这时的等待是多路复用的高效等待:一条线程替所有挂起的任务盯着所有 fd,任何一个 fd 就绪,线程就醒来。

反过来说,如果 .await 真的会 blocking,那下面的代码就不可能工作:

// 伪代码:两个 .await 背靠背
let data1 = socket1.read(&mut buf1).await?;
let data2 = socket2.read(&mut buf2).await?;

如果第一个 .await 阻塞线程,第二个 .await 永远没机会执行——但事实上它们都能正确执行。秘密就在于:第一个 .await 返回 Pending 后,线程回到调度器,调度器看到第二个任务就绪。即使第二个 .await 也返回 Pending,线程还可以去服务其他等待的 fd。阻塞在 epoll_wait 上的是线程,但任务早就从线程上被摘下来了。

所以完整回答是两层含义的叠加:

  1. epoll_wait 的阻塞线程层次的阻塞——它是在「等任何一个 fd 就绪」,不是「等一个特定的 fd」
  2. 单个 fd 的 read() 永不阻塞线程——因为 fd 被设为 O_NONBLOCK,数据没准备好就立即返回 WouldBlock,不会卡住线程

这两层合在一起就是 tokio 异步 I/O 的引擎:一条线程开着 epoll_wait 同时等所有连接,每个连接上的 read/write 都不会卡住线程,遇到 WouldBlock 就把任务挂起,线程回去继续等。

这也顺便回答了上篇文章 sleep 和 I/O 的一个本质差异:sleep 的 epoll_wait(timeout) 在超时返回后,时间轮的”到期检测”是纯粹的 CPU 计算(比较 tick),不需要任何系统调用;而 I/O 的 epoll_wait(events) 返回后,任务还需要调用 read() 这个系统调用来真正获取数据。sleep 的用户态时间轮避免了额外的内核交互,而 I/O 的每一次数据读取都是一次系统调用。

4.3 一个值得思考的对比:io_uring 在哪?

到这里你可能还有一个问题:Linux 上有 io_uring,它可以完全异步地提交 I/O 操作并从 completion queue 中收割结果,tokio 为什么不用它来替代 epoll?

这是一个很好的问题。答案是:mio 是 readiness-based(就绪态)抽象,而 io_uring 是 completion-based(完成态)抽象,两者的编程模型从根本上不兼容。

Readiness-based vs Completion-based

Readiness-based 模型(epoll / kqueue)的工作方式是:

epoll_wait → 返回「fd X 可读」→ 用户调用 read(fd, buf) → 数据从内核拷贝到用户态

epoll 只告诉你”可以读了”,实际的读操作仍然由用户发起(read() 系统调用),且必须是同步的。这正是为什么上一节强调 O_NONBLOCK——因为 read() 虽然不阻塞线程,但它毕竟是同步完成的。

Completion-based 模型(io_uring)的工作方式是:

用户提交 read(fd, buf) 到 SQ(Submission Queue)
    → io_uring_enter 通知内核干活
        → 内核直接 DMA 把数据放入 buf
            → 完成后在 CQ(Completion Queue)放入一条完成记录
                → 用户从 CQ 收割结果

io_uring 把「等待就绪 + 读数据」合并成一步:你提交一个读请求,内核在数据到达时直接 DMA 到你的 buffer,然后通知你。你连 read() 系统调用都不需要再做了。

这两种模型在驱动层面的架构差异是根本性的:

graph LR
    subgraph "Readiness-based (epoll / mio)"
        A1["epoll_wait: 就绪通知"] --> A2["用户 read(fd, buf)"]
        A2 --> A3["read 系统调用<br/>同步拷贝数据"]
    end

    subgraph "Completion-based (io_uring)"
        B1["用户提交 read op 到 SQ"] --> B2["内核异步执行 I/O"]
        B2 --> B3["CQ 完成通知"]
    end

为什么 tokio 的默认 I/O 驱动用 mio(epoll)?

mio 是一个跨平台的 I/O 抽象层:Linux 上它用 epoll,macOS/FreeBSD 上用 kqueue,Windows 上用 IOCP。它定义的编程模型是 Source + Poll + Events——你把 fd 作为一个 Source 注册到 Poll 上,然后阻塞等待事件。

Tokio 的整个 I/O 驱动——ScheduledIoRegistrationPollEvented——全是围绕这个模型构建的:

这套架构在 io_uring 面前完全不适用。io_uring 没有「fd readiness」的概念——它只有「这个 op 完成了」。你不能用 ScheduledIo 的 readiness 位来表达”异步读操作完成”。你需要一个完全不同的状态机来追踪每个 submitted op 的生命周期。

Tokio 实际是怎么支持 io_uring 的?

Tokio 确实支持 io_uring,但它被严格限定在文件系统操作上,且需要显式启用14

// tokio/src/runtime/io/driver.rs: 56-70 (条件编译)
// 需要: tokio_unstable + feature = "io-uring" + feature = "fs" + Linux

pub(crate) uring_context: Mutex<UringContext>,
pub(crate) uring_probe: OnceCell<Option<io_uring::Probe>>,

启用后,tokio 采用混合架构

这个混合体现在 turn() 方法里:处理完 epoll 事件后,立即 dispatch io_uring 的 completions14

// tokio/src/runtime/io/driver.rs: 227-236
// 处理完 epoll 事件后,dispatch io_uring 的 completion
#[cfg(all(
    tokio_unstable,
    feature = "io-uring",
    feature = "rt",
    feature = "fs",
    target_os = "linux",
))]
{
    let mut guard = handle.get_uring().lock();
    let ctx = &mut *guard;
    ctx.dispatch_completions();
}

io_uring 的 eventfd 甚至也是一个普通的 fd,注册到 epoll 上的14

// tokio/src/runtime/io/driver/uring.rs: 180-184
impl Handle {
    fn add_uring_source(&self, uringfd: RawFd) -> io::Result<()> {
        let mut source = SourceFd(&uringfd);
        self.registry
            .register(&mut source, TOKEN_WAKEUP, Interest::READABLE.to_mio())
    }
}

也就是说,io_uring 的 eventfd 被注册到 epoll 上,completion 到达时 epoll 通知驱动,然后驱动在 turn() 中 dispatch 这些 completion——io_uring 被挂在 epoll 的事件循环上

从运行时的视角看,三种事件的驱动关系是:

epoll_wait 返回
    ├─ socket fd 就绪 → 设置 ScheduledIo readiness → 唤醒等待的任务
    ├─ io_uring eventfd 可读 → dispatch io_uring completions → 唤醒等待的任务
    ├─ TOKEN_WAKEUP → 外部 unpark
    └─ TOKEN_SIGNAL → 信号处理

为什么不直接用 io_uring 替代 epoll 处理所有 socket I/O?

io_uring 的 socket 支持虽然已经存在,但用 io_uring 做网络 I/O 是否比 epoll 更好,直到今天(2026 年)仍然是一个开放问题15。原因有三:

  1. 延迟开销:io_uring 提交一个网络操作至少需要一次 io_uring_enter 系统调用(或者通过 SQ 轮询来避免),而 epoll 模式下一次 epoll_wait 可以返回多个就绪的 fd,然后逐个 read()。对于高吞吐但低延迟的网络场景,epoll 的批处理模式仍然有竞争力。

  2. epoll 的”够用”:对于网络 I/O,epoll_wait + 非阻塞 read() 的模式已经足够好了。O_NONBLOCK 确保了 read() 不会阻塞线程,epoll 的多路复用确保了单线程可以管理海量连接。io_uring 在网络上的优势主要在于减少系统调用次数和降低延迟,但在 epoll 已经做到几乎最优的场景下,替换的动力不足。

  3. 跨平台问题:io_uring 是 Linux-only(5.1+)。Tokio 的 runtime 需要跨平台一致性,把核心的网络 I/O 基础设施绑定在 io_uring 上会破坏跨平台的架构统一性。

所以 Tokio 选择了一条务实的路径:网络 I/O 用 epoll(mio),文件 I/O 用 io_uring(可选)。不是技术上的”不能”,而是架构上的”不值得为了替换而替换”。

五、读写的完整路径:从 poll 到 buffer

理解了数据结构后,来看 TcpStream::read(&mut buf).await 的完整路径。当一个 PollEvented 包装的 socket 被 poll_read10

// tokio/src/io/poll_evented.rs: 169-194
pub(crate) unsafe fn poll_read<'a>(
    &'a self,
    cx: &mut Context<'_>,
    buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>
where &'a E: io::Read + 'a,
{
    loop {
        // 1. 等待读就绪
        let evt = ready!(self.registration.poll_read_ready(cx))?;

        // 2. 尝试读取
        match self.io.as_ref().unwrap().read(b) {
            Ok(n) => {
                // 4. ET 优化:读了一部分但没读完,不清除 readiness
                //    (epoll 模式下可以继续读)
                #[cfg(not(mio_unsupported_force_poll_poll))]
                if 0 < n && n < len {
                    self.registration.clear_readiness(evt);
                }

                buf.advance(n);
                return Poll::Ready(Ok(()));
            }
            // 3. 读阻塞了(false positive):清除 readiness 后重试
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                self.registration.clear_readiness(evt);
            }
            Err(e) => return Poll::Ready(Err(e)),
        }
    }
}

这个模式比 Sleep::poll 复杂——因为 I/O 有假阳性(false positive)。epoll 返回 readable 后,fd 可能因为竞争等原因实际上不可读了。所以 I/O 的 poll 循环是:

poll_read_ready → 返回 Ready → 尝试 read
    ├─ 成功 → 返回数据
    └─ WouldBlock → clear_readiness → 重新 poll

Sleep::poll 是:

检查时间轮 → 到期了吗?
    ├─ 是 → 返回 Ready
    └─ 否 → 注册 waker → 返回 Pending

sleep 没有假阳性——deadline 到了就是到了。I/O 有假阳性——epoll 说 readable 了,但数据可能已经被别的线程读了,或者在监听 socket 上有新的 accept 提前消耗了事件。

Registration 还提供了一个更底层的 poll_io 方法——它把”等待就绪 → 尝试操作 → WouldBlock → 清理 → 重试”的循环封装在一个函数里11

// tokio/src/runtime/io/registration.rs: 162-175
fn poll_io<R>(
    &self,
    cx: &mut Context<'_>,
    direction: Direction,
    mut f: impl FnMut() -> io::Result<R>,
) -> Poll<io::Result<R>> {
    loop {
        let ev = ready!(self.poll_ready(cx, direction))?;
        match f() {
            Ok(ret) => return Poll::Ready(Ok(ret)),
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                self.clear_readiness(ev);
            }
            Err(e) => return Poll::Ready(Err(e)),
        }
    }
}

六、你还在读同一条线程吗:关于「既处理 I/O 又处理 Timer」

一个常见的困惑是:Time Driver 和 I/O Driver 是在同一条线程上运行的吗?不冲突吗?

答案是是的,同一条线程,不冲突。原因可以用一句话概括:当线程在 epoll_wait 上阻塞时,时间驱动作了 I/O 驱动的一层薄薄的外包装,它做的所有事就是设置 epoll_wait 的 timeout 参数

当 Time Driver 调用 park_timeout(duration) 时,它最终调用了 I/O Driver 的 park_timeout(duration)。I/O Driver 的 turn()duration 传给 epoll_wait。这也就是 sleep 文章中提到的那条链路。

当 I/O 事件在 duration 到达之前触发,线程提前醒来,Time Driver 在 park_internal 中返回后立即调用 process() 收割到期定时器。I/O 事件的处理和时间到期检测在同一个函数调用栈中完成,没有竞争、没有额外的上下文切换

graph TD
    subgraph "Worker 线程主循环"
        A["开始"] --> B["Time::park_internal()"]
        B --> C["计算 next_wake"]
        C --> D["IoStack::park_timeout(duration)"]
        D --> E["epoll_wait(timeout)"]
        E -->|"I/O 事件或超时"| F["Time::process_at_time()"]
        F --> G["收割到期 timer"]
        G --> H["Scheduler::run 处理就绪任务"]
        H --> B
    end

这种「一层包一层」的设计,使得 Time Driver 和 I/O Driver 完全共享同一个线程和同一次 epoll_wait 调用。没有线程切换、没有锁竞争(除了 ScheduledIo 内部的 Mutex)、没有额外的调度开销。

七、对比总览:时间驱动 vs 事件驱动

现在可以把两种表并列在一起看:

维度 时间驱动(sleep) 事件驱动(I/O)
核心数据结构 6 层哈希时间轮(Wheel 逐 fd 的原子状态机(ScheduledIo
状态管理 StateCell(AtomicU64:到期时间 + 标记位) AtomicUsize(16b readiness + 15b tick + 1b shutdown)
等待机制 比较 deadline vs now(用户态) 等待 epoll 通知(内核态)
唤醒方式 fire()Waker::wake() wake(ready)Waker::wake()
假阳性 无(时间到了就是到了) 有(epoll 可能虚报)
插入/注册 Wheel::insert() → O(1) mio::Registry::register() → 系统调用
单次检查 检查当前 slot 内的所有 entry 检查 epoll 返回的所有 event
超时/时间驱动 核心机制:靠 deadline 驱动 仅用于设置 epoll_wait timeout
内核交互 1 个定时器(hrtimer) 所有 fd 注册到 epoll
精确度 1ms 纳秒级(取决于内核)

核心差异的本质

两种驱动模型差异的根源在于同一个问题:「事件」发生的时机是否可控

时间是可控的。给定一个 deadline,Tokio 可以在用户态用时间轮 O(1) 地判断是否到期。它不需要内核在”到期”这个事件发生时主动通知——它只需要知道当前时间,然后比较即可。内核定时器只是用来让线程不要一直空转。

网络 I/O是不可控的。内核之外的客户端可能在任意时刻发送数据包。Tokio 必须靠内核来通知——因为只有内核驱动(网卡中断 → 协议栈)知道数据什么时候到达。epoll 就是这个”内核通知机制”的接口。

这种「可控 vs 不可控」的区别,导致了两套完全不同的数据结构选择:

graph LR
    subgraph "可控事件"
        A1["sleep(5s)"] --> A2["deadline = now + 5s"]
        A2 --> A3["插入时间轮(O(1))"]
        A3 --> A4["等待时:检查时间轮<br/>全部在用户态完成"]
        A4 --> A5["到期:fire()"]
    end

    subgraph "不可控事件"
        B1["TcpStream::read()"] --> B2["注册 fd 到 epoll"]
        B2 --> B3["等待时:epoll_wait<br/>内核挂起线程"]
        B3 --> B4["数据到达:epoll 通知"]
        B4 --> B5["设置 readiness + wake()"]
    end

八、另一种不可控事件:AsyncFd

tokio 的 I/O 驱动不只服务于网络 socket。AsyncFd 是一个通用的包装器,允许任何实现了 AsRawFd 的非阻塞文件描述符接入 Tokio 的 I/O 驱动16

// tokio/src/io/async_fd.rs: 248-255
pub struct AsyncFd<T: AsRawFd> {
    registration: Registration,
    inner: Option<T>,
}

它的创建流程和 TcpStream 本质相同16

// tokio/src/io/async_fd.rs: 406-414
pub(crate) fn try_new_with_handle_and_interest(
    inner: T,
    handle: scheduler::Handle,
    interest: Interest,
) -> Result<Self, AsyncFdTryNewError<T>> {
    let fd = inner.as_raw_fd();

    match Registration::new_with_interest_and_handle(
        &mut SourceFd(&fd), interest, handle,
    ) {
        Ok(registration) => Ok(AsyncFd { registration, inner: Some(inner) }),
        Err(cause) => Err(AsyncFdTryNewError { inner, cause }),
    }
}

区别在于:TcpStream 使用 mio::net::TcpStream(它实现了 mio::event::Source),而 AsyncFd 使用 mio::unix::SourceFd——一个简单的包装器,不修改 fd 的配置(因此要求 fd 已经设置为非阻塞模式)。

AsyncFd 的使用模式体现了 ET 模型的完整语义16

// 典型的 AsyncFd 使用模式
loop {
    let mut guard = async_fd.readable().await?;     // 等待可读
    match guard.try_io(|inner| inner.get_ref().read(&mut buf)) {
        Ok(result) => return result,                   // 成功读取
        Err(_would_block) => continue,                // 假阳性,重试
    }
}

这里 guard.try_io() 的语义是:如果 I/O 操作返回 WouldBlock,自动调用 clear_ready() 清除 readiness 位。这确保了下次数据到达时,epoll 会发出新的边沿触发通知。

总结

Tokio 的 I/O 驱动和 Timer 驱动虽然在同一个 worker 线程中运行、共享同一次 epoll_wait 调用,但它们的核心机制截然不同:

Timer 驱动是一个用户态的时间调度器。它用哈希时间轮 O(1) 地管理成千上万个定时器,只向内核注册一个「最近 deadline」的唤醒点。线程被 epoll_wait 的超时机制叫醒,然后批量收割到期定时器。

I/O 驱动是一个内核态事件的分发器。它通过 epoll 把 fd 的 read/write 事件从内核调度到用户态,用 ScheduledIo 的原子状态机追踪每个 fd 的就绪状态,用 tick 机制解决边沿触发的 stale event 问题。

两者在 Tokio 中的分层设计是:

应用层:tokio::time::sleep         tokio::net::TcpStream
                |                          |
运行时层:TimerEntry / Wheel          Registration / ScheduledIo
                |                          |
驱动层:  Time Driver                I/O Driver (mio)
                |                          |
内核层:  hrtimer × 1                epoll (所有 fd)

驱动层通过 IoStack 堆叠在一起:外层 Timer 检查时间轮确定 timeout,内层 I/O 执行 epoll_wait。两条线在同一个线程上优雅地交织,互不干扰。

最终,这两种异步机制的实现都可以回溯到 Rust async 的核心契约1返回 Poll::Pending 时注册 Waker,事件就绪时通过 Waker::wake() 通知调度器重新 pollsleep 用「时间到达」作为就绪条件,TcpStream 用「IO 就绪」作为就绪条件——只是事件源不同,契约完全一样。

九、Feature 与 Builder 如何控制 Driver 启用

前面提到 time::Driverprocess::Driversignal::Driverio::Driver 都可以通过条件编译或 Builder 选项跳过。这里把实际控制机制拆开看。

9.1 Cargo feature — 编译时是否包含

四个 feature 决定对应层的代码是否存在

# Cargo.toml (tokio 的 default features)
tokio = { version = "1", features = ["rt", "net", "time", "process", "signal"] }

cfg_io_driver! 宏检查 feature net(编译时才展开):

// tokio/src/runtime/driver.rs: 134
cfg_io_driver! {                        // 仅在 feature = "net" 时编译
    pub(crate) type IoDriver = crate::runtime::io::Driver;
}
cfg_not_io_driver! {                    // feature = "net" 关闭时
    pub(crate) struct IoStack(ParkThread);
    fn create_io_stack(..) -> ... {
        let park_thread = ParkThread::new();  // 没有 io::Driver,纯 Condvar
        (IoStack(park_thread), ..)
    }
}

cfg_time! / cfg_not_time! 控制 TimeDriver enum 是否存在:

// tokio/src/runtime/driver.rs: 284
cfg_time! {                             // feature = "time" → TimeDriver 是 enum
    pub(crate) enum TimeDriver {
        Enabled { driver: crate::runtime::time::Driver },
        EnabledAlt(IoStack),
        Disabled(IoStack),
    }
}
cfg_not_time! {                         // 没有 time feature → TimeDriver 是 IoStack 别名
    type TimeDriver = IoStack;
}

同理 signalprocess 的退化:

// tokio/src/runtime/driver.rs: 244
cfg_signal_internal_and_unix! {         // feature = "signal" + Unix
    type SignalDriver = crate::runtime::signal::Driver;
}
cfg_not_signal_internal! {              // 无 signal 时退化为 IoDriver
    type SignalDriver = IoDriver;
}

// tokio/src/runtime/driver.rs: 269
cfg_process_driver! {                   // feature = "process"
    type ProcessDriver = crate::runtime::process::Driver;
}
cfg_not_process_driver! {               // 无 process 时退化为 SignalDriver
    type ProcessDriver = SignalDriver;
}

编译时未启用的 feature,相关代码在最终二进制中完全不存在。

9.2 Builder 选项 — 运行时启用或跳过

编译已包含的代码,可以通过 Builder 决定是否激活:

// tokio/src/runtime/builder.rs: 1672 / 1860
let (driver, driver_handle) = driver::Driver::new(cfg)?;

cfg 来自 builder 的设置17

// tokio/src/runtime/builder.rs: 1668-1675
fn get_cfg(&self) -> driver::Cfg {
    driver::Cfg {
        enable_io: self.enable_io,          // ← .enable_io()
        enable_time: self.enable_time,      // ← .enable_time()
        nevents: self.nevents,
        timer_flavor: self.timer_flavor,
    }
}

driver::Driver::new() 在运行时判断这两个值:

// tokio/src/runtime/driver.rs: 47-58
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
    let (io_stack, io_handle, signal_handle) =
        create_io_stack(cfg.enable_io, cfg.nevents)?;   // ← 是否创建 io::Driver
    let (time_driver, time_handle) =
        create_time_driver(cfg.enable_time, .., io_stack, &clock); // ← 是否创建 time::Driver
    Ok((Self { inner: time_driver }, ..))
}

create_io_stack 内部根据 enabled 选择真正的 I/O 层或 ParkThread

// tokio/src/runtime/driver.rs: 149-170
fn create_io_stack(enabled: bool, nevents: usize) -> .. {
    if enabled {
        let (io_driver, io_handle) = io::Driver::new(nevents)?;
        (IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), ..)
    } else {
        let park_thread = ParkThread::new();
        (IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), ..)
    }
}

9.3 两层控制的关系

Cargo feature Builder 选项 代码状态 运行时结果
net + .enable_io() 代码存在 IoStack::Enabled
net + 无 .enable_io() 代码存在 IoStack::Disabled
net 代码不存在 编译报错
time + .enable_time() 代码存在 TimeDriver::Enabled
time + 无 .enable_time() 代码存在 TimeDriver::Disabled
time 代码不存在 TimeDriver = IoStack

rt feature 也必须启用(默认开启)。最终最简的 runtime 只包含 runtime::driver::Driver + IoStack::Disabled(ParkThread),可以用 tokio::spawn 做纯计算任务调度,完全不需要任何 I/O 或 Timer 能力。

References

  1. 上篇文章:《从 tokio::time::sleep 看异步 Timer 的实现:一次从 Future::poll 到哈希时间轮的源码之旅》,2026-05-03。详细分析了 tokio::time::sleepFuture::pollTimerEntryStateCellWheel 哈希时间轮的完整链路。以及更早的文章:《Rust async/await 的底层契约:从 Future::poll 到 Tokio 运行时》,2026-04-30,阐述了 Future::pollContextWaker 的核心协议。  2

  2. Tokio 源码,Driver 分层结构,tokio/src/runtime/driver.rs。定义了 DriverTimeDriver 包装 IoStack)、HandleIoHandle + SignalHandle + TimeHandle)、以及 create_io_stack / create_time_driver 的创建逻辑。  2 3

  3. mio 源码,cfg 编译时选择操作系统后端,mio/src/sys/mod.rscfg_os_poll! 宏控制模块编译:unix 下进入 unix/ 子模块(进一步选择 epoll / kqueue / poll),windows 下进入 windows/ 模块(IOCP),wasi 下进入 wasip1/ 模块。 

  4. mio 源码,epoll selector 实现,mio/src/sys/unix/selector/epoll.rsSelector::new() 通过 epoll_create1 创建 epoll fd,Selector::select() 通过 epoll_wait 获取就绪事件,Selector::register() 通过 epoll_ctl(EPOLL_CTL_ADD) 注册 fd。默认使用 EPOLLET 边沿触发模式。 

  5. mio 源码,kqueue selector 实现,mio/src/sys/unix/selector/kqueue.rsSelector::new() 通过 kqueue() 创建 kqueue fd,Selector::select() 通过 kevent 获取事件,Selector::register() 使用 EVFILT_READ / EVFILT_WRITE + EV_CLEAR 边沿触发标志。 

  6. mio Cargo.toml feature 配置,mio/Cargo.tomlos-poll feature 启用 cfg_os_poll! 条件编译;os-ext feature 启用 cfg_any_os_ext! 额外扩展接口。 

  7. Tokio 源码,runtime::io::Driver 结构定义和 turn / park_timeout / add_source 方法,tokio/src/runtime/io/driver.rsDriver 持有 poll: mio::Pollturn() 方法调用 self.poll.poll(events, max_wait) 执行 epoll_waitadd_source() 通过 self.registry.register(source, token, interest) 执行 epoll_ctl(ADD)。注册和等待在同一个 mio::Poll 实例上完成。  2 3

  8. Tokio 源码,多线程 Parker 的 TryLock<Driver> 共享机制,tokio/src/runtime/scheduler/multi_thread/park.rsShared::driverTryLock<Driver>,一次只有一个线程可以拿到锁执行 epoll_wait,其他线程退化为 condvar 等待。 

  9. Tokio 源码,TcpStream 结构定义,tokio/src/net/tcp/stream.rsTcpStream 只是 PollEvented<mio::net::TcpStream> 的一层包装,构建时调用 PollEvented::new(connected)。  2

  10. Tokio 源码,PollEvented 结构定义与 poll_read 方法,tokio/src/io/poll_evented.rsPollEvented 是通用 I/O 包装器,其 poll_read 方法展示了 ET 模式下的 read 循环(WouldBlockclear_readiness → 重试)以及针对 edge-triggered selector 的 read 部分结果优化。  2 3

  11. Tokio 源码,Registration 结构定义与 new_with_interest_and_handlepoll_io 等方法,tokio/src/runtime/io/registration.rsRegistration 是 I/O 资源与 reactor 的桥梁,封装了注册、就绪轮询和 WouldBlock 循环。  2 3

  12. Tokio 源码,I/O Driver 的 Driver::newHandle::add_sourceDriver::turntokio/src/runtime/io/driver.rsadd_sourcemio::Source 注册到 epoll,token 为 ScheduledIo 的指针地址;turn() 阻塞在 epoll_wait 上并分发事件。  2 3

  13. Tokio 源码,ScheduledIo 结构定义、set_readinesswake 方法,tokio/src/runtime/io/scheduled_io.rs。核心结构:readiness 为打包的 AtomicUsize(16b readiness + 15b tick + 1b shutdown),waitersMutex<Waiters> 管理专用槽和链表等待者。set_readiness 使用 fetch_update CAS 实现 tick 保护,wake 使用 WakeList 批量唤醒。  2 3 4 5

  14. Tokio 源码,io_uring 集成代码,tokio/src/runtime/io/driver/uring.rsUringContext 管理 io_uring 实例和 op 生命周期,dispatch_completions()turn() 方法中被调用,io_uring 的 eventfd 通过 SourceFd 注册到 epoll 上。  2 3

  15. 《io_uring and networking: 2024》,LWN.net,https://lwn.net/Articles/961273/。讨论了 io_uring 在网络 I/O 场景下的现状,包括性能对比和与 epoll 的取舍。 

  16. Tokio 源码,AsyncFd 结构定义与使用模式,tokio/src/io/async_fd.rs。通用的 fd → async 包装器,通过 mio::unix::SourceFd 接入 I/O 驱动,提供 readable() / writable() / async_io() 等高级 API 和 poll_read_ready() / try_io() 等底层 API。  2 3

  17. Tokio 源码,Builder::get_cfgCfg 定义,tokio/src/runtime/builder.rsCfg 三个关键字段:enable_io(来自 .enable_io())、enable_time(来自 .enable_time())、nevents(来自 .max_io_events_per_tick())。 

My Github Page: https://github.com/liweinan

B站视频: https://space.bilibili.com/21947620

Powered by Jekyll and Theme by solid