从 tokio::time::sleep 看异步 Timer 的实现:一次从 Future::poll 到哈希时间轮的源码之旅

tokio::time::sleep 看起来和 thread::sleep 只差一个 .await,但背后连接的是一整套哈希时间轮、原子状态机和操作系统的定时唤醒机制。这篇文章从源码出发,把这条链路拆开来看。

引言:两个 sleep 的对比

在 Rust 里让程序”等一下”有两种常见方式:

// 方式一:标准库的阻塞 sleep
std::thread::sleep(Duration::from_secs(5));

// 方式二:Tokio 的异步 sleep
tokio::time::sleep(Duration::from_secs(5)).await;

从表面看,两者似乎差不多——都是在某个地方停了 5 秒。但它们在运行时的行为有本质区别。用一个简单的对比实验就能看出来。

如果把下面三个场景跑一遍:

场景 任务1 任务2 总耗时
原生线程(2 个线程) 同步 sleep 5s 同步 sleep 2s 5s
Tokio 单线程 + 同步阻塞 thread::sleep 5s tokio::time::sleep 2s 7s
Tokio 单线程 + 全异步 tokio::time::sleep 5s tokio::time::sleep 2s 5s

先看场景一——在 async 任务里混入同步阻塞的版本:

// tokio_block_bad.rs
use tokio::time;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let start = std::time::Instant::now();

    let task1 = tokio::spawn(async move {
        println!("[任务1] 开始阻塞, 时间: {:?}", start.elapsed());
        std::thread::sleep(Duration::from_secs(5));
        println!("[任务1] 结束阻塞, 时间: {:?}", start.elapsed());
    });

    let task2 = tokio::spawn(async move {
        println!("[任务2] 开始异步等待, 时间: {:?}", start.elapsed());
        time::sleep(Duration::from_secs(2)).await;
        println!("[任务2] 结束异步等待, 时间: {:?}", start.elapsed());
    });

    let _ = tokio::join!(task1, task2);
}

运行结果:

[任务1] 开始阻塞, 时间: 32.1µs
... 卡住 5 秒,task2 根本没机会开始 ...
[任务1] 结束阻塞, 时间: 5.001s
[任务2] 开始异步等待, 时间: 5.001s
[任务2] 结束异步等待, 时间: 7.002s

task2 被 task1 的同步阻塞卡了整整 5 秒,总耗时 7 秒。

再看场景二——两个任务都用异步 sleep:

// tokio_async_good.rs
use tokio::time;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let start = std::time::Instant::now();

    let task1 = tokio::spawn(async move {
        println!("[任务1] 开始异步等待5秒, 时间: {:?}", start.elapsed());
        time::sleep(Duration::from_secs(5)).await;
        println!("[任务1] 结束异步等待, 时间: {:?}", start.elapsed());
    });

    let task2 = tokio::spawn(async move {
        println!("[任务2] 开始异步等待2秒, 时间: {:?}", start.elapsed());
        time::sleep(Duration::from_secs(2)).await;
        println!("[任务2] 结束异步等待, 时间: {:?}", start.elapsed());
    });

    let _ = tokio::join!(task1, task2);
}

运行结果:

[任务1] 开始异步等待5秒, 时间: 41.5µs
[任务2] 开始异步等待2秒, 时间: 43.2µs
[任务2] 结束异步等待, 时间: 2.001s
[任务1] 结束异步等待, 时间: 5.002s

总耗时 5 秒,两个任务在单线程上并发执行,互不阻塞。

场景一的 7 秒暴露了问题:当 task1 在 Tokio 的单线程 runtime 里直接调用 thread::sleep,整个工作线程都被阻塞,task2 根本得不到执行机会。而场景二的两个异步 sleep 却在单线程上实现了并发——task2 在 2 秒后准时完成。

关键不在于”等待”本身,而在于用什么方式等待thread::sleep 把线程卡住;tokio::time::sleep 只让任务挂起,线程依然可以服务其他任务。

这篇文章的目的就是拆开 tokio::time::sleep 的内部实现,看看它到底是怎么做到的。

在进入细节之前,可以先回顾一下异步系统的基础契约。在之前的文章《Rust async/await 的底层契约:从 Future::poll 到 Tokio 运行时》1中,我们梳理了 Future::pollContextWaker 组成的核心协议:任务被 poll,如果未就绪就返回 Poll::Pending 并保存 Waker,事件就绪后通过 wake() 通知运行时重新调度。sleep 正是这套协议的一个具体应用——它用”时间到达”作为就绪条件。

概念先行:从伪代码理解核心思路

在深入源码之前,先建立一个直觉。tokio::time::sleep 的秘密可以用一句话概括:它不会让线程去空等,而是注册一个”闹钟”后立刻把线程还给调度器,到时间了再由闹钟叫醒它。

具体来说,下面这行代码实际触发了 6 个步骤:

tokio::time::sleep(Duration::from_secs(5)).await;
sequenceDiagram
    participant Task as 你的任务
    participant Exec as Tokio 调度器
    participant Timer as Tokio 定时器(全局)

    Task->>Timer: 1. 注册:"5秒后唤醒我"
    Task->>Exec: 2. 返回 Poll::Pending
    Exec->>Exec: 3. 切换去执行其他任务
    Note over Timer: 时钟滴答滴答...
    Timer->>Task: 4. 5秒到了,调用 Waker.wake()
    Task->>Exec: 5. 重新排队等待执行
    Exec->>Task: 6. 再次 poll,返回 Poll::Ready

如果把这个过程写成极简的伪代码,大致是这样的:

// 简化的 Sleep Future(概念示意)
struct Sleep {
    deadline: Instant,
    registered: bool,
}

impl Future for Sleep {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if Instant::now() >= self.deadline {
            // 时间到了,完成
            return Poll::Ready(());
        }

        if !self.registered {
            // 时间没到:把 waker 注册到全局定时器
            // 定时器会在 deadline 时调用 waker.wake()
            let waker = cx.waker().clone();
            TOKIO_TIMER.insert(self.deadline, waker);
            self.registered = true;
        }

        // 关键:返回 Pending,交出执行权
        Poll::Pending
    }
}

核心行为只有三个:

  1. 首次 poll:发现时间没到 → 把 Waker(闹钟)注册到全局定时器 → 返回 Pending,线程继续服务其他任务
  2. 等待期间:零轮询、零浪费——不需要每秒检查 1000 次”时间到了吗”,定时器在精确时间点触发唤醒
  3. 定时器触发Waker::wake() 通知调度器把任务放回就绪队列,下次 poll 时返回 Ready

这和张嘴就来的 thread::sleep 有着本质区别:

方面 thread::sleep tokio::time::sleep
线程状态 OS 挂起线程 线程继续执行其他任务
CPU 利用 0%(但线程资源被占) 100%(有效利用)
可并发量 ≈ 线程数(几千上限) 百万级任务
响应方式 时间到自动唤醒 通过 Waker 通知调度器

有了这个心理模型,接下来就可以拆开源代码,看 Tokio 怎么把伪代码里的 TOKIO_TIMER 变成真正的数据结构。

一、Sleep 的结构:一个 Future 的解剖

tokio::time::sleep(duration) 的签名返回一个 Sleep 结构体:

pub fn sleep(duration: Duration) -> Sleep {
    // 计算 deadline = now + duration
    // 然后调用 Sleep::new_timeout(deadline, location)
}

它实现了 Future trait,所以可以 .await。但 Sleep 里面到底放了什么?核心字段如下2

// tokio/src/time/sleep.rs: 220-232
pin_project! {
    pub struct Sleep {
        // 内部字段(tracing 相关,非关键路径)
        inner: Inner,

        // 连接 Sleep 实例和驱动它的 Timer 的关键纽带
        #[pin]
        entry: Timer,
    }
}

entry 字段的类型是 Timer,它在 tokio::runtime 内部被定义为一个 enum3

// tokio/src/runtime/mod.rs: 437-442
pub(crate) enum Timer {
    Traditional(time::TimerEntry),
    Alternative(time_alt::Timer),  // 仅 tokio_unstable + rt-multi-thread
}

本文聚焦在 Traditional 分支——也就是默认的 Timer 实现。TimerEntry 的结构如下4

// tokio/src/runtime/time/entry.rs: 287-310
pub(crate) struct TimerEntry {
    // Arc 引用到 runtime 的调度器句柄
    driver: scheduler::Handle,

    // 共享的内部结构,参与侵入式链表
    #[pin]
    inner: Option<TimerShared>,

    // deadline:首次 poll 时才注册到时间轮
    deadline: Instant,

    // 是否已经注册
    registered: bool,
}

TimerShared 是前端(TimerEntry)和驱动端(Driver)之间共享的状态5

// tokio/src/runtime/time/entry.rs: 336-360
pub(crate) struct TimerShared {
    // 侵入式双向链表的指针
    pointers: linked_list::Pointers<TimerShared>,

    // 注册到 Wheel 时的时间(原子变量)
    registered_when: AtomicU64,

    // 当前状态:到期时间、已触发、或已注销
    state: StateCell,

    _p: PhantomPinned,
}

这几个结构的层次关系可以总结为下图:

graph TD
    A["tokio::time::sleep(duration)"] --> B["Sleep<br/>(Future)"]
    B --> C["Timer<br/>(enum)"]
    C --> D["TimerEntry<br/>(Traditional 分支)"]
    D --> E["TimerShared<br/>(共享状态)"]
    E --> F["StateCell<br/>(原子状态 + Waker)"]
    E --> G["registered_when<br/>(AtomicU64)"]
    E --> H["pointers<br/>(侵入式链表节点)"]

这个图展示的是数据归属关系——从用户 API(sleep())一层层往下走到 StateCell。但 TimerEntry 持有的 driver: scheduler::Handle 不在这条垂直链上——它是横向的通道,把每个 TimerEntry 连接到全局唯一的 Driver。这里 TimerEntry 持有 scheduler::Handle,但真正处理 StateCell 的是 time::Handle。它们之间隔着三层导航,容易混淆。下面是完整的 Handle 类型层次和职责划分:

TimerEntry.driver : scheduler::Handle    ← 运行时全貌(task 调度 + blocking pool + 所有 driver)
  │
  └─ .driver()  → driver::Handle        ← 各 driver 的聚合体
       ├─ .io: IoHandle                 ← I/O driver(mio)
       ├─ .signal: SignalHandle
       ├─ .clock: Clock
       └─ .time() → time::Handle        ← Time Driver 专用句柄
                       ├─ time_source    ← Instant ↔ tick
                       └─ inner         ← Mutex<InnerState { wheel }>
                                             └─ 通过 wheel.poll → fire → StateCell
graph TD
    A["scheduler::Handle<br/>运行时全貌"] -->|".driver()"| B["driver::Handle<br/>所有 driver 聚合"]
    B -->|".io"| C["IoHandle<br/>I/O driver"]
    B -->|".time()"| D["time::Handle<br/>Time Driver 专用"]
    B -->|".clock"| E["Clock"]
    D -->|"持有"| F["Mutex&lt;InnerState&gt;"]
    F -->|"包含"| G["Wheel<br/>哈希时间轮"]
    F -->|"包含"| H["next_wake"]

    I["TimerEntry<br/>用户侧句柄"] -->|".driver 字段"| A
    I -->|"持有"| J["TimerShared<br/>共享状态"]
    J -->|"包含"| K["StateCell<br/>原子状态机"]
    J -->|"包含"| L["registered_when"]

    G -->|"poll 返回"| J
    D -->|"process → fire"| K

    style A fill:#e1f5fe
    style B fill:#fff3e0
    style D fill:#c8e6c9
    style I fill:#f3e5f5
    style K fill:#ffcdd2

TimerEntryscheduler::Handle 当通行证,需要 timer 就往下走到 time(),需要 I/O 就走到 io。但 StateCell 只归 time::Handleprocess() 链操作——I/O driver 和 signal driver 永远不会碰它。

下面的调用链展示了从 TimerEntry.driverscheduler::Handle)导航到 time::Handle 再到 process() 的完整路径——这就是上文类图和数据流图的代码对应:

// tokio/src/runtime/time/entry.rs: TimerEntry::poll_elapsed 及相关方法
impl TimerEntry {
    // TimerEntry 拿到 time::Handle 的唯一途径
    fn driver(&self) -> &super::Handle {
        self.driver        // ① scheduler::Handle (timer 创建时存入)
            .driver()      // ② → driver::Handle   (调度器句柄上的 .driver() 方法)
            .time()        // ③ → time::Handle     (driver 聚合体上的 .time() 方法)
    }

    pub(crate) fn poll_elapsed(&self, cx: &mut Context<'_>) -> Poll<...> {
        // self.driver() 返回 time::Handle,但 poll 不需要它——
        // StateCell 就在自己内部的 TimerShared 里
        self.inner.state.poll(cx.waker())
    }
}

三个 Handle 的精简结构定义——只列出与本链相关的字段和方法:

// === tokio/src/runtime/scheduler/mod.rs ===
pub(crate) enum Handle {                      // ① scheduler::Handle
    CurrentThread(Arc<current_thread::Handle>),
    MultiThread(Arc<multi_thread::Handle>),
}
impl Handle {
    pub(crate) fn driver(&self) -> &driver::Handle { ... }  // → ②
}

// === tokio/src/runtime/driver.rs ===
pub(crate) struct Handle {                    // ② driver::Handle
    pub(crate) io: IoHandle,                  // I/O driver 句柄
    pub(crate) signal: SignalHandle,          // 信号 driver 句柄
    pub(crate) time: TimeHandle,              // TimeHandle = Option<time::Handle>
    pub(crate) clock: Clock,                  // 时钟源
}
impl Handle {
    pub(crate) fn time(&self) -> &time::Handle { ... }  // → ③
}

// === tokio/src/runtime/time/handle.rs ===
pub(crate) struct Handle {                    // ③ time::Handle
    pub(super) time_source: TimeSource,       // Instant ↔ tick 转换
    pub(super) inner: super::Inner,           // Inner::Traditional { state: Mutex<InnerState>, ... }
}
// InnerState = { next_wake: Option<NonZeroU64>, wheel: wheel::Wheel }

导航链路的三个关键跃迁点:

  1. scheduler::Handle::driver() — 从运行时全貌进入 driver 聚合层
  2. driver::Handle::time() — 从 driver 聚合中提取 Time Driver 句柄(若未 enable_time 则 panic)
  3. time::Handle 持有 Mutex<InnerState{ wheel }> — 从这里开始 process()Wheel::poll()fire()StateCell

简单来说:用户持有的 Sleep 是一个 Future,它的内部通过 TimerEntry 管理一个 deadline,而 TimerShared 以侵入式链表节点的形式嵌入到底层的哈希时间轮(Hashed Timing Wheel)中。

二、首次 poll:注册到时间轮

Sleep 第一次被 poll 时,TimerEntry 还没有注册到时间轮。这时它会调用 reset,将自己插入到底层的 Wheel 数据结构中6

// tokio/src/runtime/time/entry.rs: 598-617
pub(crate) fn poll_elapsed(
    mut self: Pin<&mut Self>,
    cx: &mut Context<'_>,
) -> Poll<Result<(), super::Error>> {
    if !self.registered {
        let deadline = self.deadline;
        // 首次 poll:将 deadline 注册到时间轮
        self.as_mut().reset(deadline, true);
    }

    let inner = self.inner()
        .expect("inner should already be initialized by `self.reset()`");
    // 通过 StateCell 检查是否已经到期
    inner.state.poll(cx.waker())
}

reset 的核心逻辑是:

  1. Instant 格式的 deadline 通过 TimeSource 转换成 u64 类型的 tick(毫秒级)
  2. 先尝试原子操作 extend_expiration——如果新 deadline 比旧的晚,可以无锁更新
  3. 如果 extend_expiration 失败(比如 deadline 提前了,或者 timer 已经在触发队列中),则走 reregister 路径重新插入 Wheel

注意这里的类型转换和调用链。reset() 的源码中7

// tokio/src/runtime/time/entry.rs: 638-649
let inner = match self.inner() {
    Some(inner) => inner,          // inner: &TimerShared
    None => { /* 初始化 */ }
};

// extend_expiration 失败时走 reregister 路径
if reregister {
    unsafe {
        self.driver()
            .reregister(&self.driver.driver().io, tick, inner.into());
    }
}

inner&TimerShared.into() 通过标准库的 From<&T> for NonNull<T> 将其转换为 NonNull<TimerShared>——一个原始指针包装。这个 NonNull 随后被传入 Handle::reregister()8

// tokio/src/runtime/time/mod.rs: 398-435
pub(self) unsafe fn reregister(
    &self, unpark: &IoHandle, new_tick: u64,
    entry: NonNull<TimerShared>,    // ← 来自 inner.into()
) {
    let mut lock = self.inner.lock();

    // 如果 timer 还在 Wheel 中,先移除
    if unsafe { entry.as_ref().might_be_registered() } {
        lock.wheel.remove(entry);
    }

    // 创建 TimerHandle——一个 NonNull<TimerShared> 的 "唯一指针"
    let entry = entry.as_ref().handle();

    entry.set_expiration(new_tick);  // 设置新的到期时间

    // 插入 Wheel
    match unsafe { lock.wheel.insert(entry) } {
        Ok(when) => {
            // 如果新到期时间比当前最早更早 → unpark 线程
            if lock.next_wake.map(|n| when < n.get()).unwrap_or(true) {
                unpark.unpark();
            }
        }
        Err((entry, InsertError::Elapsed)) => unsafe { entry.fire(Ok(())) },
    }
}

这里 entry.as_ref().handle() 创建 TimerHandle——它的定义极其简单9

// tokio/src/runtime/time/entry.rs: 183-188
pub(crate) struct TimerHandle {
    inner: NonNull<TimerShared>,
}

// tokio/src/runtime/time/entry.rs: 308-312
impl TimerShared {
    pub(super) fn handle(&self) -> TimerHandle {
        TimerHandle { inner: NonNull::from(self) }
    }
}

TimerHandle 不拥有 TimerShared——它只是一个”唯一指针”,借用规则由 unsafe 契约保证(持有驱动锁时才能操作)。

随后 Wheel::insert() 根据到期时间计算层级和 slot 位置,调用 Level::add_entry()TimerHandle(即 NonNull<TimerShared>)push 到 slot 的侵入式链表上10

// tokio/src/runtime/time/wheel/mod.rs: 88-112
pub(crate) unsafe fn insert(
    &mut self, item: TimerHandle,
) -> Result<u64, (TimerHandle, InsertError)> {
    let when = unsafe { item.sync_when() };  // 同步到期时间
    if when <= self.elapsed {
        return Err((item, InsertError::Elapsed));  // 已过期
    }

    let level = self.level_for(when);  // 计算层级
    unsafe { self.levels[level].add_entry(item); }
    Ok(when)
}

// tokio/src/runtime/time/wheel/level.rs: 122-128
pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) {
    let slot = slot_for(unsafe { item.registered_when() }, self.level);
    self.slot[slot].push_front(item);  // push 到侵入式链表
    self.occupied |= occupied_bit(slot);  // 标记 slot 非空
}

self.slot[slot] 的类型是 EntryList,即 LinkedList<TimerShared, TimerShared>——一个侵入式双向链表。push_frontTimerHandle 中的 NonNull<TimerShared> 作为链表节点插入,TimerSharedpointers 字段就是它的 prev/next 指针。

这里有一个关键设计——两层到期时间缓存TimerShared 中有两个时间字段:

// tokio/src/runtime/time/entry.rs: 269-281
pub(crate) struct TimerShared {
    registered_when: AtomicU64,  // ① 写入 slot 时的到期时间缓存
    state: StateCell,            // ② StateCell 内部的 AtomicU64 存 true expiration
    // ...
}

sync_when() 是两层之间的桥梁——把 StateCell.state(true_when)同步到 registered_when,供 Wheel 做 slot 定位:

// Wheel::insert() 先调用 sync_when()
let when = unsafe { item.sync_when() };  // StateCell.state → registered_when
if when <= self.elapsed { return Err(Elapsed); }
let level = self.level_for(when);           // 用 sync 后的值算层级
self.levels[level].add_entry(item);         // add_entry 内部用 registered_when 算 slot

收割时,process_expiration()mark_pending(expiration.deadline) 直接读 StateCell.state 做最终比对:

// process_expiration 简化
match unsafe { item.mark_pending(expiration.deadline) } {
    Ok(()) => pending.push_front(item),  // StateCell.state ≤ deadline → 到期
    Err(expiration_tick) => {            // StateCell.state > deadline → 还没到
        // 把 true_when 写回 registered_when,重插入正确 slot
        self.levels[level].add_entry(item);
    }
}

这种两层设计是为 extend_expiration 乐观路径服务的:用户通过 CAS 无锁延后了 StateCell.state,但 registered_when 还指向旧的 slot。Driver 收割时发现 state > slot deadline,不触发 fire,而是把 state 同步回 registered_when 并重插入正确 slot——整个过程零锁争用。

所以整条注册链是:

Sleep::poll_elapsed()
    → TimerEntry::reset()
        → extend_expiration(tick)  ← 乐观路径:CAS 无锁延后
        → Handle::reregister()      ← 失败时走此路径
            → lock.wheel.insert(handle)  ← TimerHandle 入 Wheel
                → Level::add_entry()
                    → slot[slot_index].push_front(item)
                    → occupied |= (1 << slot)

现在可以更精确地描述数据流:Sleep 持有 TimerEntryTimerEntry 持有 Pin<Option<TimerShared>>TimerShared 通过 NonNull<TimerShared>(即 TimerHandle)以侵入式链表节点形式直接挂在 Level.slot[slot]。没有复制、没有中间分配——移动 TimerHandle 就是在操作 TimerSharedpointers 指针。

这个过程可以用下面的序列图看清:

sequenceDiagram
    participant Sleep as Sleep Future
    participant Entry as TimerEntry
    participant Shared as TimerShared
    participant Handle as TimerHandle
    participant Wheel as Wheel
    participant Level as Level.slot[64]

    Sleep->>Entry: poll_elapsed(cx)
    Entry->>Entry: reset(deadline)
    Entry->>Shared: extend_expiration(tick)
    Note over Shared: 如果新 deadline<br/>比旧的晚→CAS 成功<br/>跳过重新注册
    alt extend_expiration 失败
        Entry->>Shared: inner.into() → TimerHandle
        Entry->>Wheel: Handle.reregister(tick)
        Wheel->>Wheel: remove(old slot)
        Wheel->>Wheel: insert(handle)
        Wheel->>Level: level_for(when) 计算层级
        Level->>Level: slot[slot_index].push_front(item)
        level-->>Wheel: OK(when)
        Wheel-->>Entry: 注册完成
    end
    Entry->>Shared: state.poll(waker)
    Shared-->>Entry: Poll::Pending
    Entry-->>Sleep: Poll::Pending

这里有一个关键细节:首次注册不是在 sleep() 调用时发生的,而是在第一次 poll 时发生的。这也是为什么如果在 runtime 外部直接调用 sleep() 会 panic——它需要拿到当前的调度器句柄,而句柄只存在于 runtime 上下文中。如果把它包在一个 async block 里,调用就是惰性的,等到真正 poll 时已经在 runtime 内部了,就不会 panic11

具体来说,scheduler::Handle(内含 time::Handle)的注入分两步:

① sleep(duration) 被调用时 ── Handle 存入 Sleep,但 TimerEntry 还未创建
② 首次 .await 触发 poll 时 ── Handle 从 Sleep 传递给 TimerEntry::new()
// ① tokio/src/time/sleep.rs: Sleep::new_timeout —— sleep() 调用时
impl Sleep {
    fn new_timeout(deadline: Instant, ..) -> Self {
        Self {
            driver: Handle::current(),  // ← 从 thread-local 取出 scheduler::Handle
            timer: None,               // ← 这里还是 None,TimerEntry 懒创建
            deadline,
            ..
        }
    }
}

// ② tokio/src/time/sleep.rs: Sleep::poll_elapsed —— 首次 poll 时
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut Context<'_>) -> .. {
    let timer = match this.timer.as_mut().as_pin_mut() {
        Some(timer) => timer,
        None => {
            let handle = this.driver;  // ← 步骤①存入的 scheduler::Handle
            let timer = Timer::new(handle.clone(), *this.deadline);
            //  ↑ Timer::new() → TimerEntry::new() → TimerEntry { driver: scheduler::Handle }
            this.timer.set(Some(timer));
            ..
        }
    };
    timer.poll_elapsed(cx)
}

Timer::new() 内部调用 TimerEntry::new(),后者将 scheduler::Handle 存入 TimerEntry.driver 字段。这个 Handle 贯穿 timer 的整个生命周期——pollresetreregister 都用它来导航到 time::Handleio::Handle

由此可以画出两条独立的调用路径——它们不直接调用对方,而是在 StateCell 处交汇:

═══════════ Task 侧(poll 路径)═══════════     ═══════════ Driver 侧(park 路径)═══════════
                                              park_internal(rt_handle: &driver::Handle)
Sleep::poll(cx)                                 │
  │                                             ├─ let handle = rt_handle.time()
  └─ poll_elapsed(cx)                           │    // → &time::Handle
       │                                        │
       └─ TimerEntry::poll_elapsed(cx)          ├─ handle.process(clock)
            │                                   │    │
            └─ self.inner.state                 │    └─ process_at_time(now)
                 .poll(cx.waker())              │         │
                 │                              │         └─ lock.wheel.poll(now)
                 │  StateCell::poll():          │              │
                 ├─ store waker                 │              └─ entry.fire(Ok(()))
                 └─ load state                  │                   │
                    └─ Poll::Pending            │                   └─ StateCell::fire()
                                                      ↑              ├─ store DEREGISTERED
                                                 交汇点 ──────────────┘  └─ take waker
                                                                              │
                                                                         waker.wake()

Task 侧写入 Waker(poll),Driver 侧取出 Waker(fire)。两者操作的是同一个 TimerShared 内的同一个 StateCell,但 Task 侧不需要 time::Handle——它直接操作自己持有的 TimerShared.state;Driver 侧也不需要知道 TimerEntry 的存在——它通过 TimerHandle(一个 NonNull<TimerShared> 裸指针)间接操作。

三、哈希时间轮(Hashed Timing Wheel)

Timer 的核心数据结构是一个多层哈希时间轮。Tokio 的注释明确引用了 Varghese 和 Lauck 的经典论文12,该论文也因 Linux 内核的高精度定时器实现而广为人知。

时间轮的核心思想很简单:把到期的定时器散列到不同的 slot 里,时钟走动时只检查当前 slot,避免每次都扫描全部定时器

它本质上是用空间换时间:用 6 层 × 64 slot 的数组,换取每次 tick 只扫一个 slot 的能力。对比一下就能明白——如果用一个普通 Vec 或链表存储所有定时器,每次 tick 都得全量扫描一遍,O(n);而时间轮把定时器按到期时间散列,每次 tick 只检查当前 slot,O(1)。存储只是手段,快速检索到期定时器才是时间轮存在的目的。

Tokio 使用了 6 层时间轮,每层 64 个 slot:

Level 0: 64 × 1 ms      = 64 ms 覆盖范围
Level 1: 64 × 64 ms     = ~4 秒覆盖范围
Level 2: 64 × ~4 s      = ~4 分钟覆盖范围
Level 3: 64 × ~4 min    = ~4 小时覆盖范围
Level 4: 64 × ~4 hr     = ~12 天覆盖范围
Level 5: 64 × ~12 day   = ~2 年覆盖范围

下图展示了一个简化的 3 层时间轮的工作方式:

graph LR
    subgraph "Level 2 (~4 min)"
        L2["slot 0-63<br/>每 slot ~4s"]
    end
    subgraph "Level 1 (~4 sec)"
        L1["slot 0-63<br/>每 slot 64ms"]
    end
    subgraph "Level 0 (64 ms)"
        L0["slot 0-63<br/>每 slot 1ms"]
    end

    L2 -->|"降级:slot 到期<br/>重新分配"| L1
    L1 -->|"降级"| L0
    L0 -->|"到期触发"| F["fire → wake"]

定时器首先被插入到与它的到期时间匹配的层级。比如一个 10 秒后到期的定时器,会落在 Level 2 的某个 slot 里。当该 slot 到期时,里面的所有定时器会被重新分配到 Level 1;然后再降到 Level 0;最终在 Level 0 的 slot 到期时真正触发。

这种层级设计的优势在于:

同一 Level 内的 64 个 slot 共享同一个精度(slot_range),但覆盖不同的时间窗口。例如 Level 2 的 slot[0] 覆盖 0~4s,slot[1] 覆盖 4~8s,以此类推——精度始终是 ~4s,只是窗口在平移。

Wheel 结构体的关键字段13

// tokio/src/runtime/time/wheel/mod.rs: 22-39
pub(crate) struct Wheel {
    // 时间轮启动以来经过的毫秒数
    elapsed: u64,

    // 6 层,每层 64 个 slot
    levels: Box<[Level; NUM_LEVELS]>,

    // 等待触发的定时器队列
    pending: EntryList,
}

每个 Level 包含一个 [EntryList; 64] 数组,而 EntryList 就是 LinkedList<TimerShared>14

// tokio/src/runtime/time/wheel/level.rs: 6-15
pub(crate) struct Level {
    level: usize,
    occupied: u64,              // 位图标记哪些 slot 非空
    slot: [EntryList; 64],      // 64 个 slot,每个存放一个侵入式链表
}

// tokio/src/runtime/time/entry.rs: 195
pub(super) type EntryList = LinkedList<TimerShared, TimerShared>;

occupied 是一个 u64 位图——第 i 位为 1 表示 slot[i] 非空。next_expiration()trailing_zeros() 直接在一条指令中找到最低位的 1,避免遍历 64 个 slot:

// tokio/src/runtime/time/wheel/level.rs: 60-78
pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
    // occupied >> now_slot: 跳过当前 slot 之前的位
    let mask = self.occupied.rotate_right(now_slot as u32) >> 1;
    let next = mask.trailing_zeros();      // ← 一条指令找下一个非空 slot
    //                        ↑ 如果 mask = 0(无更多非空 slot),返回 64
    //                          调用者据此判断本层无到期
    if next >= 64 { return None; }

    let slot = (now_slot + next as usize + 1) % 64;
    let level_range = 64_usize.pow(self.level as u32) * 64;
    let slot_range = 64_usize.pow(self.level as u32);
    let level_start = (now / level_range as u64) * level_range as u64;
    let deadline = level_start + (slot as u64) * slot_range as u64;
    Some(Expiration { level: self.level, slot, deadline })
}

所以 Wheel::poll()loop 内部相当于:

for each level [0..5]:
    if level.occupied != 0:                    // 位图非空
        mask = (occupied >> now_slot) >> 1
        next = mask.trailing_zeros()           // 硬件指令,~1 个周期
        if next < 64: return (level, slot, deadline)
    // 否则本层无到期 → 继续查下一层
return None  // 所有层级都空

最坏情况(没有任何 timer):6 次 occupied != 0 检查 + 6 次 trailing_zeros,总开销约几十个 CPU 周期。无空轮询、无循环遍历 64 slot——这是位图优化在时间轮中的经典用法。

每层只检查 1 个 slot——trailing_zeros() 找到最低位的 1 后直接返回,每层最多输出一个 Expiration。所以 next_expiration() 扫描 6 层最多做 6 次 trailing_zeros、返回 1 个 (level, slot, deadline),不继续找第二个 slot。

这里涉及三个 next_expiration 方法,分散在两个 struct 中,各管一级:

Wheel::next_expiration_time()           ← mod.rs:194,pub(super)
  └→ Wheel::next_expiration()            ← mod.rs:168,private
       └→ for each level [0..5]:
            Level::next_expiration(elapsed)   ← level.rs:51,pub(crate)
               → occupied.trailing_zeros()
               → 返回该层最早的 Expiration { level, slot, deadline }

收割与降级:process_expiration 的决策

Wheel::poll(now) 找到 expiration.deadline ≤ now 的 slot 后,调用 process_expiration(expiration) 处理该 slot 中所有 entry。对每个 entry,处理逻辑不区分层级——完全由 mark_pending() 的返回值决定:

  process_expiration(在 Wheel::poll 内,持 driver 锁)
         │
     mark_pending(deadline)
         │
   ┌─────┴─────┐
   │           │
 state ≤ deadline   state > deadline
   │           │
 Ok(())      Err(tick)
   │           │
 pending     level_for() + add_entry()
 入队         (降级,仍留在轮子上)

  ─── 同一轮 Wheel::poll 的下一次迭代,或 process_at_time 的 while 循环 ───

 pending.pop() → Handle::process_at_time 里 entry.fire(Ok(()))
                      → WakeList → 释放锁后 waker.wake()

关键的认识:

所以整个 Level 层级的职责是统一的:每层只负责按自己的 slot 粒度找到最早的到期 slot,把整组 entry 取出来;mark_pending 裁決入队还是降级,fire 则由上层的 process_at_time 统一处理。 没有 per-level 特化代码,6 个 Level 实例(self.levels: Box<[Level; 6]>)共享同一份算法。

用一句话区分三个关键方法:next_expiration_time() 扫描 6 层找最早 deadline(算睡多久);process_expiration() 逐 entry 裁決入 pending 还是降级;process_at_time()pending 取出并 fire、批量唤醒任务。

next_wake 的值是所有非空层级中最早的 deadline,不一定是 Level 0 的 slot:

Level 0: empty        (occupied = 0)
Level 1: empty
Level 2: slot[9]      → deadline = elapsed + 4096×9 = elapsed + 36,864ms
Level 3: slot[1]      → deadline = elapsed + 262144×1 = elapsed + 262,144ms

next_expiration(): 从 Level 0 开始逐层检查
  → Level 0: occupied=0 → 跳过
  → Level 1: occupied=0 → 跳过
  → Level 2: slot[9] occupied → 返回 deadline ≈ 当前 + 36s
  → 不再检查 Level 3+(因为已有更早的结果)

next_wake = 36,864  ← 来自 Level 2 而非 Level 0

next_expiration() 从 Level 0 到 Level 5 逐层扫描,只要低层有非空 slot 就立即返回——不再继续查更高层。所以 next_wake 永远是最小粒度的那个到期 slot,可能是 Level 2 的 slot 9(≈36s 后),也可能是 Level 0 的 slot 3(≈3ms 后),取决于当前时间轮中的 timer 分布。

所以 TimerShared 和 Wheel 的关系是:

graph TD
    subgraph Wheel
        W["Wheel"]
        subgraph Levels
            L0["Level 0: [slot0, slot1, ..., slot63]"]
            L1["Level 1: [slot0, slot1, ..., slot63]"]
            L5["... Level 5"]
        end
        Pend["pending: EntryList"]
    end

    subgraph Timer
        TS1["TimerShared A<br/>(state + pointers)"]
        TS2["TimerShared B<br/>(state + pointers)"]
    end

    L0 -->|"slot[5]"| TS1
    L0 -->|"slot[5]"| TS2

    TS1 -->|"pointers.prev"| TS2
    TS2 -->|"pointers.next"| TS1

    subgraph Sleep
        S["Sleep 1"] --> TE1["TimerEntry 1"]
        TE1 --> TS1
        S2["Sleep 2"] --> TE2["TimerEntry 2"]
        TE2 --> TS2
    end

这里的关键是 TimerShared 通过侵入式链表的 pointers 字段直接挂在 Level.slot[slot_index],Wheel 不复制也不拥有 TimerShared 的所有权——它只通过指针链接。这就是”侵入式”的含义:被链接的对象自身包含链接指针,而非由容器分配独立的节点。这也意味着 TimerShared 必须被 pin 住不能移动,因为链表指针指向的是它自身的内存地址。

3.2 一个完整的例子:5 分钟 sleep 的降级之旅

关于时间参考点的说明Wheel.elapsedStateCell.stateregistered_when 都是相对于同一个 TimeSource.start_time绝对毫秒偏移量。下面的例子设 start_time = sleep 创建的时刻,此时 elapsed 在第一次 process_at_time 前尚未推进,设为例值 0。实际运行中 elapsed 一般是个小的非零值(runtime 启动后到首次 park 之间经过的毫秒数),但这一点偏移不影响层级计算逻辑,因为 state - elapsed 的差值不受影响。

把前面的概念串起来,假设你在 elapsed=0(即 runtime 刚启动时)创建了一个 tokio::time::sleep(Duration::from_secs(300)).await,看看它在 Wheel 中如何从高层降级到低层,最终被 fire。

t=0:创建和注册

StateCell.state = STATE_DEREGISTERED  // 初始值 u64::MAX
Wheel.elapsed = 0                     // Wheel 刚开始

首次 poll 时TimerEntry::reset(deadline) 被调用。TimeSource::deadline_to_tick() 将 5 分钟(300 秒)转为毫秒 tick:

tick = 300,000   // 5 min = 300,000 ms
extend_expiration(tick=300000)
  → 失败(初始状态是 STATE_DEREGISTERED,不是有效的到期时间)
  → 走 reregister 路径
     → set_expiration(300000)
        StateCell.state = 300,000    // ← true expiration
     → Wheel::insert()
        → sync_when()
            registered_when = 300,000
            StateCell.state = 300,000 (不变)
        → level_for(0, 300000) = 3
          // 计算过程:masked = 0 ^ 300000 | 63 = 300063
          // leading_zeros = 45, significant = 18, 18/6 = 3
        → Level 3, slot 1
          // slot_for(300000, 3) = (300000 >> 18) % 64 = 1

此时 Wheel 状态:

Wheel.elapsed = 0
  Level 3: slot[1] → TimerShared (registered_when=300000, state=300000)

t=0 ~ 4分钟:线程 park,等最早 deadline

park_internal()next_expiration_time(),发现 Level 3 slot 1 的 deadline 在约 4.37 分钟后(1 × 64^3 = 262,144ms),于是 park_timeout(262s)

t=262s:Level 3 slot 1 到期,降级到 Level 2

Driver 醒来,process_at_time(262144)Wheel::poll(262144)Wheel::poll() 的内部循环13先检查 next_expiration() 找到 Level 3 slot 1,发现其 deadline=262144 ≤ now,于是调用 self.process_expiration(expiration)

// tokio/src/runtime/time/wheel/mod.rs: 126-133
pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> {
    loop {
        if let Some(handle) = self.pending.pop_back() { return Some(handle); }
        match self.next_expiration() {
            Some(ref expiration) if expiration.deadline <= now => {
                self.process_expiration(expiration);  // ← 收割降级
                self.set_elapsed(expiration.deadline);
            }
            _ => { self.set_elapsed(now); break; }
        }
    }
    self.pending.pop_back()
}

process_expiration 先取走 slot 中所有 entry(take_entries),然后逐个调用 mark_pending(deadline)13。对每个 entry,mark_pendingStateCell.statedeadline 做对比:

// tokio/src/runtime/time/wheel/mod.rs: 171-196
pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
    let mut entries = self.take_entries(expiration);  // 取出 slot 链表
    while let Some(item) = entries.pop_back() {
        match unsafe { item.mark_pending(expiration.deadline) } {
            Ok(()) => self.pending.push_front(item),  // 到期
            Err(expiration_tick) => {        // 还没到
                let level = level_for(expiration.deadline, expiration_tick);
                unsafe { self.levels[level].add_entry(item); }  // 降级
            }
        }
    }
}

itemTimerHandle(即 NonNull<TimerShared>)。mark_pending(262144) 内部:

// tokio/src/runtime/time/entry.rs: 171-199 (StateCell::mark_pending)
let mut cur_state = self.state.load(Ordering::Relaxed);
// cur_state = 300,000(5分钟到期)
// not_after = 262,144(Level 3 slot 1 deadline)
if cur_state > not_after {  // 300,000 > 262,144 → true
    break Err(cur_state);   // 还没到点,返回 true expiration
}

返回 Err(300000) 后,TimerHandle::mark_pendingtrue_when 写回 registered_when

// tokio/src/runtime/time/entry.rs: 250-264 (TimerHandle::mark_pending)
Err(tick) => {
    unsafe { self.inner.as_ref().set_registered_when(tick); }
    Err(tick)
}

回到 process_expiration,用 level_for(262144, 300000) 重新计算层级:

masked = 262144 ^ 300000 | 63 = 37887
leading_zeros = 48, significant = 15, 15/6 = 2
→ Level 2, slot 9
  slot_for(300000, 2) = (300000 >> 12) % 64 = 9
→ add_entry: Level 2, slot[9].push_front(item)
→ timer 已从 Level 3 slot 1 移至 Level 2 slot 9
Wheel.elapsed = 262,144
  Level 2: slot[9] → TimerShared (registered_when=300000, state=300000)
  Level 3: slot[1] → (空)

t=262s ~ 298s:继续 park

next_expiration_time() 找到 Level 2 slot 9 的 deadline:

slot_range(2) = 64^2 = 4,096 ms
slot 9 × 4,096 ms = 36,864 ms
Level 2 slot 9 deadline ≈ 262,144 + 36,864 = 298,...

线程 park_timeout(~36s)

t=298s:Level 2 slot 9 到期,降级到 Level 1

process_expiration(Level 2, slot 9, deadline=298xxx):
  → mark_pending(298xxx)
     StateCell.state = 300,000 > 298,xxx → 还没到点
     → Err(300000)
  → level_for(298xxx, 300000) = 1
     → Level 1, slot 对应 300,000

Wheel.elapsed = 298,...
  Level 1: slot[...] → TimerShared
  Level 2: slot[9] → (空)

t=298s ~ 300s:Level 1 到 Level 0,最终 fire

类似逻辑继续降级到 Level 0。Level 0 slot 宽度是 1ms:

elapsed >= 300,000 时,Level 0 到期。process_expiration 再次被调用,但这次是 Level 0,mark_pending 中的判断不同了:

// tokio/src/runtime/time/entry.rs: 171-199 (StateCell::mark_pending)
let mut cur_state = self.state.load(Ordering::Relaxed);
// cur_state = 300,000
// not_after = 300,000(Level 0 slot deadline)
if cur_state > not_after {  // 300,000 > 300,000 → false
    break Err(cur_state);
}
// cur_state ≤ not_after → 到期!
compare_exchange(cur_state, STATE_PENDING_FIRE, AcqRel, Acquire)

CAS 成功将状态从 300000 置为 STATE_PENDING_FIRE,返回 Ok(()),entry 被推入 self.pending。随后 Wheel::poll()loop 下一次迭代从 pending 弹出这个 entry,process_at_time 拿到后调用 fire()

// tokio/src/runtime/time/entry.rs: 211-231 (StateCell::fire)
let cur_state = self.state.load(Ordering::Relaxed);
if cur_state == STATE_DEREGISTERED { return None; } // 不为 u64::MAX

// 写入结果
unsafe { self.result.with_mut(|p| *p = Ok(())) };
// state → STATE_DEREGISTERED(u64::MAX)
self.state.store(STATE_DEREGISTERED, Ordering::Release);

// 取出 waker 返回
self.waker.take_waker()

fire 返回 Some(waker)process_at_time 将其压入 WakeList,释放锁后批量 wake_all()——任务被放回调度队列,下次 Sleep::poll() 返回 Poll::Ready(())

如果用 reset() 延长 deadline 呢?

假设在第 1 分钟时,用户调用了 sleep.reset(new_deadline=10min)

TimerEntry::reset(10min)
  → tick = 600,000
  → extend_expiration(600000)
     StateCell.state 从 300,000 CAS 到 600,000 ✓
     → 成功,return(跳过 reregister!)

此时 timer 还在 Level 3 slot 1 里,registered_when 仍然是 300,000。没有任何链表操作——这就是”乐观延后”的零锁优势。

当 Driver 在 elapsed=262144 时收割 Level 3 slot 1:

mark_pending(262144):
  StateCell.state = 600,000 > 262,144
  → Err(600000)
  → 把 true_when(600000) 同步回 registered_when
  → 按 600,000 重新计算层级
     level_for(262144, 600000) = 更高的层级...
  → 插入到正确的 slot

整个过程零锁争用——extend_expiration 是 CAS,mark_pending 也是 CAS,没有 Mutex。

3.3 elapsed 是如何维护的?

一个关键的设计细节是:elapsed 不依赖任何外部定时器来推进。它是 Driver 在每次 process_at_time(now) 时,用 nowelapsed 的差值来”推”的。

每次 Driver 醒来(无论是被 epoll_wait 超时叫醒,还是被 I/O 事件提前 unpark),都会调用 handle.process(clock),内部是 process_at_time(now)。Wheel 收到 nowu64 tick,来自 TimeSource),和当前的 elapsed 比较差值,把落在差值内的 slot 逐个处理——收割到期 timer、降级上层 timer。

Driver 被 I/O 事件提前叫醒(本应在 5 秒后才醒):
  → process_at_time(now=1003)    // 只走了 3ms
  → Wheel: 从 elapsed=1000 到 now=1003
           第 1000 slot 收割 → 无到期 timer
           第 1001-1002 slot 无内容
           elapsed = 1003
  → 所有 timer 都还在,下次 park_timeout 重新计算

Driver 在 deadline 时被叫醒:
  → process_at_time(now=6000)
  → Wheel: 从 elapsed=1000 到 now=6000
           第 1000、1001、…、5000 slot 逐个处理
           → elapsed=5000 时有到期 timer → fire → wake
           → 继续推进到 6000
           elapsed = 6000
  → 重新计算 next_wake

所以时间轮是完全自洽的:它不和”墙上时间”绑定,只和 elapsed 这个单调递增的计数器绑定。TimeSourceclock.now() 只用来算增量——增量多大,elapsed 就推多远。即使系统被暂停(suspend-to-RAM),恢复后 clock.now() 返回更新后的时间,增量包含暂停时长,elapsed 一次推到位,所有”超期的 timer”被立即收割。

tokio 不依赖任何外部定时器来推进时间轮。外部定时器(epoll_wait 的 timeout)的唯一作用是”到点叫醒线程”——醒来后 tokio 用自己的 TimeSource 计算增量、推进 elapsed、收割到期 timer。这就是为什么 park_timeout(0) 也能工作:即使不进入内核定时器,process_at_time 也能用 now = TimeSource::now() 推进 Wheel。

四、StateCell:原子状态机

TimerShared 中最关键的部分是 StateCell——一个用原子变量实现的状态机,负责协调用户端(TimerEntry)和驱动端(Driver)之间的并发访问15

// tokio/src/runtime/time/entry.rs: 91-102
pub(super) struct StateCell {
    // 保存到期时间,或特殊标志值 u64::MAX(已注销)
    state: AtomicU64,

    // 定时器触发后的结果
    result: UnsafeCell<TimerResult>,

    // 已注册的 Waker
    waker: AtomicWaker,
}

state 字段用单个 AtomicU64 同时承载”到期时间”和”状态标记”——它是一个 tagged union,利用值域范围区分含义:

stateDiagram-v2
    [*] --> Deregistered
    Deregistered --> Scheduled: set_expiration(tick)
    Scheduled --> PendingFire: mark_pending() 成功
    PendingFire --> Deregistered: fire(result)
    Scheduled --> Deregistered: 直接 fire (注)
    Deregistered --> Scheduled: 重新注册 (set_expiration)

(注)包括 clear_entry(cancel/drop)firereregister 检测到 InsertError::Elapsed 时直接 fire、runtime shutdown 时 fire(Err(…)) 等路径。所有路径最终都通过 fire() 进入 Deregistered——StateCell 层面不存在独立的 “Cancelled” 状态,取消和正常到期共享同一个终端状态。

三个离散状态的编码定义在 entry.rs 顶部(L72-78):

状态 stateu64 取值 含义
Scheduled(已调度) [0, MAX_SAFE_MILLIS_DURATION],即 < STATE_MIN_VALUE 的任意值 定时器在时间轮中等待;state 存储的就是具体的到期 tick
PendingFire(待触发) STATE_PENDING_FIRE = u64::MAX - 1 已被驱动从时间轮取出,放入 pending 链表,即将触发
Deregistered(已触发/已注销) STATE_DEREGISTERED = u64::MAX 已触发或已注销;结果已写入 result 字段,poll() 返回 Ready(result)

其中 Scheduled 是”胖状态”——state 可以是 [0, MAX_SAFE_MILLIS_DURATION] 之间的任意值,编码了具体的到期时间。而 PendingFire 和 Deregistered 是纯标记,不携带额外信息。这种设计的精妙之处在于:

  1. 原子性——compare_exchange_weak 可以一次原子操作、”检查到期时间 + 标记状态转换”。例如 mark_pending() 中的 CAS 同时验证 state ≤ not_after 并将值替换为 STATE_PENDING_FIRE
  2. 无锁乐观更新——extend_expiration() 用 CAS 尝试把到期时间往后推;如果 state 已被驱动改成 STATE_DEREGISTERED,CAS 失败,调用者就知道需要走完整的重新注册路径,无需先取锁。
  3. 隐式 Release-Acquire 同步——fire() 先写 result 再以 Release 顺序写 state = STATE_DEREGISTEREDread_state()Acquire 顺序读 state,保证读到 STATE_DEREGISTEREDresult 一定已写入完毕,零额外栅栏。

当前状态标记只占用了 u64 顶部 2 个值:

取值 状态
u64::MAX - 1 = 0xFFFF_FFFF_FFFF_FFFE STATE_PENDING_FIRE
u64::MAX = 0xFFFF_FFFF_FFFF_FFFF STATE_DEREGISTERED

但预留边界的定义很灵活:STATE_MIN_VALUE 被定义为 STATE_PENDING_FIRE,加上注释 // This value should be updated if any other signal values are added above. 表明,只需把 STATE_MIN_VALUE 减 1,就能让 u64::MAX - 2 成为第 3 个可用信号值。整个 u64 值域中大于 MAX_SAFE_MILLIS_DURATION0xFFFF_FFFF_FFFF_FFFD)的部分都是预留空间,目前只占用了顶部 2 个,扩展空间充裕。

事实上 MAX_SAFE_MILLIS_DURATION ≈ 18.4 × 10¹⁸ 个 tick,按 tokio 默认的毫秒粒度约 5.8 亿年,时间表达范围远超实际需求,划顶部几个值给状态标记几乎不影响可用 tick 空间。

驱动端的两段式处理PendingFireprocess_expiration 里的 mark_pending 通过 CAS 写入;Deregisteredpark_internal 醒来后的 process_at_timewheel.pollfire 写入。下文分述三个原子操作,注意 mark_pendingfire 不在同一调用栈

几个关键操作的实现:

poll——用户端检查是否到期15

// tokio/src/runtime/time/entry.rs: 142-162
fn poll(&self, waker: &Waker) -> Poll<TimerResult> {
    // 先注册 waker,确保 fire 和 poll 之间的竞争不会丢失唤醒
    self.waker.register_by_ref(waker);
    // 读取状态
    self.read_state()
}

fn read_state(&self) -> Poll<TimerResult> {
    let cur_state = self.state.load(Ordering::Acquire);
    if cur_state == STATE_DEREGISTERED {
        // 已经触发,读取 result
        Poll::Ready(unsafe { self.result.with(|p| *p) })
    } else {
        Poll::Pending
    }
}

注意 poll 里先注册 waker 再读状态的顺序——如果反过来,可能会发生在读状态之后、注册 waker 之前这个窗口期内定时器恰好触发并调用 fire,从而导致任务永久丢失唤醒信号。

mark_pending——驱动端将定时器移入待触发队列15

// tokio/src/runtime/time/entry.rs: 171-199
unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
    let mut cur_state = self.state.load(Ordering::Relaxed);
    loop {
        if cur_state > not_after {
            // 实际到期时间比当前 tick 晚——不应该触发
            break Err(cur_state);
        }
        match self.state.compare_exchange_weak(
            cur_state,
            STATE_PENDING_FIRE,
            Ordering::AcqRel,
            Ordering::Acquire,
        ) {
            Ok(_) => break Ok(()),
            Err(actual_state) => cur_state = actual_state,
        }
    }
}

这里使用 CAS(Compare-And-Swap)来原子地将定时器从”已调度”状态转换到”待触发”状态。CAS 失败意味着有人并发修改了状态(比如用户端重置了定时器),需要重试。

TimerHandle::mark_pending 在成功时还会把 registered_when 标成 STATE_DEREGISTERED,表示该 entry 已脱离轮子链表、挂在 Wheel.pending 上——此时尚未写入 result,也尚未取出 waker。调用方(process_expiration)负责 pending.push_front(item)此处不会调用 fire

fire——完成定时器(由 Handle::process_at_time 调用)15

// tokio/src/runtime/time/entry.rs: 211-231
unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
    let cur_state = self.state.load(Ordering::Relaxed);
    if cur_state == STATE_DEREGISTERED {
        return None;  // 已经触发过了
    }

    // 写入结果
    unsafe { self.result.with_mut(|p| *p = result) };
    // 标记为已注销
    self.state.store(STATE_DEREGISTERED, Ordering::Release);

    // 取出之前注册的 waker 以便唤醒
    self.waker.take_waker()
}

fire 本身只写 result、置 STATE_DEREGISTERED 并取出 waker;谁在何时调用它由驱动层的 process_at_time 决定:

为什么 fire() 用普通 store 而不是 CAS?

你可能会注意到,fire() 修改 state 时用的是普通的 store(Release),而不是 CAS。这和其他方法(如 mark_pending()extend_expiration())形成对比——为什么呢?

关键原因:fire() 被调用时,已经没有并发写入者了。

定时器触发的完整路径是两阶段的:

mark_pending()                                    fire()
    │                                               │
    ├─ CAS: tick → STATE_PENDING_FIRE               ├─ store: PENDING_FIRE → DEREGISTERED
    │  (条件更新:只有 state 还是原 tick 才改)        │  (确定性更新:state 一定是 PENDING_FIRE)
    │                                               │
    └─ 入 pending 链表                              └─ 写 result、取 waker

mark_pending() 已经用 CAS 把 state 从具体的 tick 值原子地转换成了 STATE_PENDING_FIREu64::MAX - 1,即 ≥ STATE_MIN_VALUE)。而 extend_expiration()(用户线程调用的无锁乐观更新)的保护逻辑是:

fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> {
    let mut prior = self.state.load(Ordering::Relaxed);
    loop {
        if new_timestamp < prior || prior >= STATE_MIN_VALUE {
            return Err(());  // state 已被驱动改成了 PENDING_FIRE → 走慢路径
        }
        // CAS 尝试更新
    }
}

一旦 state 变成 STATE_PENDING_FIRE≥ STATE_MIN_VALUE),extend_expiration() 会因 prior >= STATE_MIN_VALUE 立即返回 Err,转而走需要获取驱动锁的重新注册路径。所以当 fire() 执行时,可能的并发写入者已经全部退出了——fire()state 拥有独占访问权,一个普通的 store(Release) 就足够了。

场景 mark_pending / extend_expiration fire
是否有并发写入者 ✅ 是(用户线程可能同时 extend_expiration ❌ 否(extend_expiration 已因 state ≠ tick 而退出)
需要的语义 条件性更新:”只有 state 还是我期望的值才改” 确定性更新:”state 一定是 PENDING_FIRE,直接写 DEREGISTERED”
使用的原子操作 compare_exchange_weak store(Release)

行首的防御性检查 if cur_state == STATE_DEREGISTERED { return None; } 只是防止 fire() 被意外重复调用的安全网——正常情况下走到这里的 state 一定是 STATE_PENDING_FIRE

总结mark_pending() 的 CAS 相当于”加锁获取所有权”,fire() 的 store 相当于”持有所有权时直接写入”。两阶段设计让每个操作使用最合适的原子原语,既保证正确性又避免不必要的 CAS 开销。

process_at_time——从 pending 弹出并 fire16

// tokio/src/runtime/time/mod.rs: 296-337(核心循环,略去时钟回拨处理)
pub(self) fn process_at_time(&self, mut now: u64) {
    let mut waker_list = WakeList::new();
    let mut lock = self.inner.lock();

    while let Some(entry) = lock.wheel.poll(now) {
        debug_assert!(unsafe { entry.is_pending() });

        // 持 driver 锁;entry 已从轮子链表 / pending 中移除
        if let Some(waker) = unsafe { entry.fire(Ok(())) } {
            waker_list.push(waker);

            if !waker_list.can_push() {
                drop(lock);           // 避免持锁 wake 死锁
                waker_list.wake_all();
                lock = self.inner.lock();
            }
        }
    }

    lock.next_wake = lock.wheel.poll_at().map(/* ... */);
    drop(lock);
    waker_list.wake_all();
}

wheel.poll(now) 每次迭代优先 pending.pop_back();若 pending 为空,才在内部调 process_expiration(其中的 mark_pending 可能 newly 推入 pending,下一轮循环即弹出)。因此 mark_pendingentry.fire 同属一次 process_at_time 调用,但分属 WheelHandle 两层——上文的 StateCell::fire 正是被这里的 entry.fire(Ok(())) 调用。

触发入口:Driver::park_internalpark / park_timeout 返回后执行 handle.process(clock)process_at_time(now)(§3.2 走读、§5 有更完整的 park_internal 上下文)。

少数路径会跳过 mark_pending、在持锁下直接 fire:例如 reregisterinsert 返回 InsertError::Elapsed,或 clear_entry 取消已注册 timer。

4.1 为什么每个 TimerShared 独立拥有 StateCell?

理解 StateCell 之后,一个自然的问题是:为什么每个 TimerShared(即每个 timer)都独享自己的 StateCell?能不能多个 timer 共享一个?

答案是否定的。因为”一个 timer 的 poll 和另一个 timer 的 fire 之间的并发”和”同一个 timer 的 pollfire 之间的并发”性质完全不同:

如果所有 timer 共享一个 StateCell,那就是一把全局锁的缩影——task A 的 poll() 和 task B 的 poll() 也要争同一个原子变量,毫无隔离性可言。

所以模式是:每个 timer 的并发边界是单 timer 粒度的,独立 StateCell 是最自然的选择。

如果没有 StateCell 会怎样?

三个具体问题,按严重程度排列:

1. 丢失唤醒(lost wakeup)——最致命

用传统 Mutex<bool> 替代 StateCell 的话,存在一个经典的 TOCTOU(Time-Of-Check to Time-Of-Use)窗口:

时间线:
  Driver 线程:   检查到期 → 设 flag=true → waker.wake()
                    ↑
  Task 线程:     poll() → 检查 flag=false → 注册 waker → 返回 Pending

Task 线程读 flag(false)和注册 waker 之间插入了 Driver 线程:flag 被设为 true,wake 被调用——但 Task 已经检查了 flag 为 false,即将注册 waker 后返回 Pending。这个 wake 永久丢失,任务永远醒不过来。

StateCell 用 AtomicU64 + 先注册 waker 再读状态的顺序消除这个窗口:即使在写入 waker 后、读状态前 Driver 调用了 fireread_state() 读到 STATE_DEREGISTERED 后直接返回 Ready,不会丢失。

2. 重复触发(double fire)

没有原子状态区分”正在 fire”和”已被取消”——Driver 可能在一个被 cancel() 的 timer 上再次调用 waker.wake(),造成任务被错误唤醒两次。

StateCell 的 fire() 中先检查 cur_state == STATE_DEREGISTERED,是则返回 None;CAS 确保一次到期只转换为一次 wake()

3. 锁竞争放大

没有原子状态,每层操作都持锁:

StateCell 把高频路径——poll() 检查是否到期——变成一次 AtomicU64::load:免锁、无竞争、单条指令。只在真正的竞争边界(CAS 更新状态)需要一条原子指令,不持有任何锁。

每个 timer 一个 AtomicU64,遍历 N 个 timer 要 N 次 CAS,影响精度吗?

一个自然的担心:Driver 在 process_at_time 的一次 while wheel.poll(now) 里可能要处理大量到期 timer(process_expiration 里多次 mark_pending,弹出后再 fire,每个 entry 至少两次 CAS),如果同一 slot 里有几千个 timer,遍历开销会不会让它们”不准时”?

实际约束让这个开销无关紧要:

约束一:每个 CAS 的成本极低。一次 AtomicU64::compare_exchange 在 x86 上是一条 lock cmpxchgq 指令,无争用时约 10-20ns。1,000 个 timer 同时到期 ≈ 10-20μs 的遍历时间。

约束二:slot 粒度本身就是保护层。时间轮 level 0 的 slot 宽度是 1ms。同一 slot 内的所有 timer 本来就在同一个 1ms 窗口内到期。遍历开销占 slot 宽度的 1-2%

t=1000ms   level-0 slot 边界
    ├─ timer A (1000.1ms 到期)
    ├─ timer B (1000.5ms 到期)
    ├─ timer C (1000.7ms 到期)
    └─ ... 最多几千个 timer
t=1001ms   level-0 slot 边界
           ↑ 遍历 1,000 个 timer 耗时 ~15μs
           ↑ 不影响下一个 slot(1001ms 才需要检查)

约束三:锁零争用。CAS 只在 Driver 和 Task 同时操作同一个 timer 时才失败。大多数 timer 到期时对应的任务睡眠在 Poll::Pending——CAS 一次成功,无重试。对比 Mutex 方案:收割时 lock() + unlock() 一千次,且 Task 端的 poll() 也要抢同一把锁,争用放大。

真实的权衡是17

每个 timer 一个 AtomicU64:
  × 遍历 N 个 timer 需要 N 次 CAS
  ✓ 零锁争用、零 syscall、O(N) 的 ~10ns/timer 线性代价

替代方案——每个 slot 一把锁:
  × Task poll() 必须等 Driver 放锁(可能活锁)
  × 锁竞争随 timer 数量超线性放大
  ✓ 批量移动 slot 时只需一次锁操作

Tokio 选择了 CAS 方案。因为 timer 数量越多,CAS 的优势越明显——锁竞争是超线性放大,CAS 遍历是严格 O(N) 且常数极小。

五、运行时如何驱动定时器

前面讲的是 Sleep 的结构和状态管理,但这些定时器到底是怎么被驱动的?答案在运行时的主循环里。

驱动 park_internal() 的不是某个特殊事件,而是调度器 worker 线程的空闲检测——”没活干了就得等,等的同时顺便把到期的 timer 处理掉”。两种 runtime 的触发逻辑一致,但 park 机制不同:

multi_thread(每个 worker 线程的主循环):

  loop {
      从本地队列取 task → poll 它
      如果本地空 → 偷其他 worker 的 task
      如果全都空 → park_timeout()
                    │
                    └─ try_lock(Driver) 抢到?
                         ├─ 是 → driver.park_timeout()
                         │        └─ time::Driver::park_internal()
                         │             ├─ next_wake = wheel.next_expiration
                         │             ├─ epoll_wait(timeout)    ← 线程休眠
                         │             └─ handle.process(clock)  ← 处理到期 timer
                         │
                         └─ 否 → Condvar.wait()(其他 worker 已在当 driver)
  }

current_thread(单线程主循环):

  block_on(future) {
      loop {
          从本地队列取 task → poll 它
          如果本地空 → 查 inject 队列
          如果全空 → park()
                      └─ time::Driver::park_internal()
                           ├─ (同上:查 next_wake → epoll_wait → process)
                           └─ handle.process(clock)
      }
  }

所以 worker 线程的时间线是 poll task 和 park driver 交替进行

worker 线程时间线:
  poll task A → poll task B → 队列空了 → park(driver) → process() → 有 task 被唤醒 → poll task C → ...
                              ↑ 这里触发           ↑ 这里处理到期 timer,唤醒等待 task

multi_thread 里哪个 worker 抢到 TryLock 哪个就当 driver,其余用 Condvar 退化等待。这就解释了为什么 StateCell 的并发窗口存在:worker-A 在 poll Sleep 的同时,worker-B 可能正作为 driver 执行 process()

在这个主循环的驱动下,Tokio 的运行时中有两个 Driver,各有自己的核心函数:

这两个 Driver 是分别独立初始化的——在 runtime::driver::Driver::new() 中,先创建 I/O Driver,再在其上包裹 Time Driver,两者各自持有一份内部状态18

// tokio/src/runtime/driver.rs: 108-117
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
    let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
    //                ↑ I/O Driver 从这里创建,持有 mio::Poll
    let (time_driver, time_handle) =
        create_time_driver(cfg.enable_time, cfg.timer_flavor, io_stack, &clock);
    //  ↑ Time Driver 从这里创建,持有 Wheel
    //  ↑ io_stack 作为 IoStack 传给 Time Driver——这就是"堆叠"
}

创建出的 Handle 包含了两个子句柄:

// tokio/src/runtime/driver.rs: 52-59
pub(crate) struct Handle {
    pub(crate) io: IoHandle,       // I/O Driver 的句柄
    pub(crate) signal: SignalHandle,
    pub(crate) time: TimeHandle,   // Time Driver 的句柄
    pub(crate) clock: Clock,
}

这个 HandleArc 包装后分发给所有 worker 线程——所有线程共享同一个 Driver 实例,没有 per-thread 的 Driver。多线程访问通过内部锁串行化:Time Driver 的 Wheel 受 Mutex<InnerState> 保护(一次只有一个线程收割),I/O Driver 的 epoll_wait&mut self 上自然串行。

两者的关系是堆叠调用——Timer 的 park_internal() 内部调用 I/O Driver 的 turn(),后者执行实际的 epoll_wait 阻塞:

Timer::park_internal()
    ├── 查时间轮 → 计算 timeout(最早 deadline - now)
    ├── IoStack::park_timeout(timeout)
    │   └── I/O Driver::turn()
    │       └── epoll_wait(events, timeout)   ← 真正的阻塞点
    ├── process_at_time()                      ← 收割到期 timer
    └── (I/O 事件已在 turn() 里处理:ScheduledIo::wake)

每个 Driver 维护自己的数据结构,醒来后各管各的:

graph TD
    subgraph Runtime::park_internal
        A["开始"] --> B["Timer::park_internal()"]
        B --> C["1. 查 Wheel → next_wake"]
        C --> D["2. IoStack::park_timeout(duration)"]
        D --> E["I/O Driver::turn()"]
        E --> F["epoll_wait(events, timeout)"]
        F -->|"唤醒"| G["分发 I/O 事件<br/>→ ScheduledIo::wake()"]
        G --> H["Timer::process_at_time()"]
        H --> I["收割到期 timer<br/>→ StateCell::fire()"]
    end
    style B fill:#e1f5fe
    style E fill:#c8e6c9
    style H fill:#e1f5fe
    style G fill:#c8e6c9

Timer Driver 决定”等多久”,I/O Driver 负责”执行等待”,醒来后各处理各的数据——Timer 收割时间轮,I/O 分发事件到 ScheduledIo。两者在同一个线程上、同一次 epoll_wait 返回后依次完成。

下面来看 Timer Driver 的 park_internal 具体实现16

// tokio/src/runtime/time/mod.rs: 213-256
fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
    let handle = rt_handle.time();
    let mut lock = handle.inner.lock();

    // 获取时间轮中最近的到期时间
    let next_wake = lock.wheel.next_expiration_time();
    lock.next_wake = next_wake.map(|t|
        NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())
    );
    drop(lock);

    match next_wake {
        Some(when) => {
            let now = handle.time_source.now(rt_handle.clock());
            let mut duration = handle.time_source
                .tick_to_duration(when.saturating_sub(now));

            if duration > Duration::from_millis(0) {
                // 有定时器在将来才到期:带超时地 park 线程
                self.park_thread_timeout(rt_handle, duration);
            } else {
                // 有定时器已经到期或即将到期:立即返回
                self.park.park_timeout(rt_handle, Duration::from_secs(0));
                // ↑ 不能留空。park_timeout(0s) 走完整 Driver 链但每层只做
                //   非阻塞检查——epoll_wait(0)、release_pending_registrations、
                //   信号处理、子进程清理——不阻塞,但每层的事都办了。
            }
        }
        None => {
            // 没有定时器且有 limit:带超时 park
            // 没有定时器且没有 limit:无限期 park
        }
    }

    // 醒来后处理到期的定时器
    handle.process(rt_handle.clock());
}

handle.process(clock) 是一条从驱动层通往时间轮的完整调用链16

time::Handle::process(clock)                     → time/mod.rs
  └→ time::Handle::process_at_time(now)          → time/mod.rs
      └→ Inner::lock() → MutexGuard<InnerState>   ← 持 driver 锁
         while InnerState.wheel: Wheel::poll(now) → wheel/mod.rs
           ├─ Wheel.pending.pop_back()            → 返回 TimerHandle ─┐
           └─ 或 Wheel::poll 内 loop:            │                    │
                Wheel::next_expiration()         │                    │
                Wheel::process_expiration(exp)   │                    │
                   TimerHandle::mark_pending(d)  │  ← 不 fire         │
                      └→ StateCell::mark_pending │                    │
                   Ok  → Wheel.pending.push_front│                    │
                   Err → level_for()             │                    │
                         Level::add_entry(h)     │                    │
                Wheel::set_elapsed(deadline)     │                    │
           └─ 返回 TimerHandle ──────────────────┘                    │
         TimerHandle::fire(Ok(()))               → entry.rs(consume)│
            └→ StateCell::fire()                 → 写 result、取 waker
         InnerState.next_wake ← Wheel::poll_at()
         drop(MutexGuard)
         WakeList::wake_all()

类型分层:time::Handle 持锁并驱动循环;wheel::Wheel 推进时间、收割 slot(process_expirationWheel 的方法,不是 Handle 的);TimerHandle 是对 TimerShared 的短命句柄,mark_pending / fire 定义在 TimerHandle,内部再委托 StateCell 做原子操作;降级时 level_for()(模块级函数)配合 wheel::Level::add_entry 把 handle 挂回更低层 slot。

流程很清晰:

sequenceDiagram
    participant RT as Runtime Worker
    participant Driver as Time Driver
    participant Wheel as Hashed Wheel
    participant OS as OS Timer

    RT->>Driver: park_internal()
    Driver->>Wheel: next_expiration_time()
    Wheel-->>Driver: Some(5000) = 5 秒后
    Driver->>OS: park_timeout(5s)
    Note over OS: 线程休眠,CPU 可服务其他任务
    OS-->>Driver: 5 秒后唤醒(或更早被 unpark)
    Driver->>Wheel: process_at_time → wheel.poll(now)
    Note over Wheel: 收割时 mark_pending 入 pending
    Wheel-->>Driver: 弹出 TimerHandle
    Driver->>Driver: entry.fire → WakeList
    Driver->>RT: wake_all()
    RT->>RT: 重新 poll 对应的 Sleep Future

关键点在于:线程的休眠时间是由时间轮中最早的到期时间决定的。如果有多个 sleep 在等待,线程会以最早的 deadline 作为 park timeout;当时间到达(或被其他事件提前 unpark)后,驱动处理所有到期的定时器,批量唤醒对应的任务。

具体的处理逻辑在 Handle::process_at_time16

// tokio/src/runtime/time/mod.rs: 296-337
pub(self) fn process_at_time(&self, mut now: u64) {
    let mut waker_list = WakeList::new();
    let mut lock = self.inner.lock();

    // 从时间轮中收集所有到期(≤ now)的定时器
    while let Some(entry) = lock.wheel.poll(now) {
        if let Some(waker) = unsafe { entry.fire(Ok(())) } {
            waker_list.push(waker);
            if !waker_list.can_push() {
                // 批次满了,释放锁后批量唤醒
                drop(lock);
                waker_list.wake_all();
                lock = self.inner.lock();
            }
        }
    }

    // 更新下一次唤醒时间
    lock.next_wake = lock.wheel.poll_at()
        .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
    drop(lock);

    // 唤醒剩余 waker
    waker_list.wake_all();
}

Wheel::poll(now) 内部的 loop 是这条链的核心——它每次调 next_expiration() 找到最早到期的 slot,处理完再找下一个,直到没有 deadline ≤ now 的 slot 为止。所以如果线程睡了 5 秒后醒来、now - elapsed = 5000ms,这个 loop 会把这 5000ms 内跨过的所有 slot(从 Level 0 到 Level 5)逐个处理完:

Wheel::poll(now=6000), elapsed=1000
  → next_expiration → Level 0 slot 1000 (deadline=1000) → process → set_elapsed(1000)
  → next_expiration → Level 0 slot 1001 (deadline=1001) → process → set_elapsed(1001)
  ...
  → next_expiration → Level 1 slot 0  (deadline=4096)  → process → set_elapsed(4096)
  → next_expiration → Level 1 slot 1  (deadline=8192)  → 8192 > 6000 → break
→ set_elapsed(6000)

next_expiration_time()(在 park_internal 中用于算睡多久)只找一个最早 deadline;而 Wheel::poll(now)(在 process_at_time 中用于醒来后收割)循环收割全部 deadline ≤ now 的 slot,不限层级不限数量。前者说”最早什么时候醒”,后者处理”该醒时哪些到了”。

这里使用 WakeList 来批量收集 waker,在释放驱动锁之后再统一调用 wake(),避免持有锁时调用外部代码带来死锁风险。


5.x 三者关系梳理:Handle、StateCell 与 Waker

前面四、五两节分别详细拆解了 StateCell 的原子状态机和 Driver 的驱动流程。这里用一张结构图把两者的交汇点——Handle(驱动入口)、StateCell(状态机)、Waker(唤醒令牌)三者之间的协作关系集中梳理清楚,便于形成整体 mental model。

SleepWaker 的三层封装关系如下:

Sleep (用户层面的 Future)
  └── timer: TimerEntry                  → 拥有 timer 的所有权
        ├── driver: scheduler::Handle    → 共享的调度器句柄(Arc 引用)
        │     │
        │     └── .time(): &Handle       → 获取 Time Driver 句柄
        │           ├── time_source      → Instant ↔ tick 转换
        │           └── inner            → Mutex<InnerState { wheel: Wheel }>
        │
        └── inner: TimerShared           → 侵入式节点,同时存在于 Sleep 和 Wheel 中
              ├── state: StateCell       → 原子状态机,存 deadline 和 Waker
              ├── registered_when        → wheel 中的槽位位置
              └── pointers               → 侵入式双向链表指针

这里的关键不对称性:TimerEntry 引用 Handle(多对一,所有 timer 共享同一个 driver);而 TimerShared 作为侵入式节点链入 Wheel(一对一,每个 timer 恰好占据时间轮中的一个槽位)。StateCell 是这两者的交汇点——它被 TimerShared 持有,同时被 Driver 和 task 两侧并发访问。

StateCell 的多线程并发是真实存在的。 在 multi-thread runtime 中,worker 线程 A 上运行的任务调用 Sleep::poll()StateCell::poll()(无锁),与此同时 worker 线程 B 正在执行 park_internal()process()StateCell::fire()(持有 Mutex<InnerState> 保护 Wheel,但 StateCell 本身在此锁之外)。Mutex<InnerState> 只串行化了 Driver 端对 Wheel 的访问,Task 端对 StateCell 的访问完全不受此锁约束——这正是 StateCell 必须用 CAS 和 AtomicU64 的原因。如果是 current_thread runtime 则所有操作在同一线程上,原子操作虽无争用但仍正确执行。

下面用两条线程路径的具体代码来展示这个并发窗口——不是伪代码,而是真实的 Tokio 源码调用链,标注了每条路径上当前持有哪些锁、线程 ID 如何不同。

工作线程 A(执行 User Task)               工作线程 B(执行 Driver)
══════════════════════════════            ═══════════════════════════
                                          park_internal()
                                          │
tokio::spawn(async {                      ├─ lock = handle.inner.lock()
    // poll() 触发注册:                      │   ↓ 持有 Mutex<InnerState>
    Sleep::poll(cx)                       │   保护的是 Wheel(链表操作),
    │                                     │   不是 StateCell
    └→ Sleep::poll_elapsed(cx)            │
       │                                  ├─ next_wake = wheel
       └→ TimerEntry::poll_elapsed(cx)    │     .next_expiration_time()
          │                               ├─ drop(lock)  ← 释放锁
          │  // 无锁!                      │
          └→ self.inner.state             ├─ self.park.park_timeout(
                .poll(cx.waker())       │     duration)
                │                         │   ↓ epoll_wait, 线程休眠
                │  StateCell::poll():     │
                ├─ waker.register(waker)  │   醒来!
                │  // AtomicWaker::       │
                │  // register_by_ref     ├─ handle.process(clock)
                │                         │
                ├─ state.load(Acquire)    └→ process_at_time(now)
                │  // 读 AtomicU64          │
                │                           ├─ lock = inner.lock()
                ├─ 若 != DEREGISTERED        │   ↓ 再次持有 Mutex
                │  └→ Poll::Pending         │
                │                           ├─ while entry =
                                          │     lock.wheel.poll(now)
    // Task 让出执行权,线程 A 继续跑         │
    // 其他 task...                         │   对每个到期的 entry:
                                          │
                                          │   └→ entry.fire(Ok(()))
                                          │      │
                                          │      │  TimerHandle::fire()
                                          │      │  └→ StateCell::fire()
                                          │      │     ├─ state.load(Relaxed)
                                          │      │     ├─ state.store(
                                          │      │     │   DEREGISTERED,
                                          │      │     │   Release)
                                          │      │     └→ waker.take_waker()
                                          │      │        // AtomicWaker::take
                                          │      │        // → Option<Waker>
                                          │      │
                                          ├─ drop(lock)
                                          └→ waker_list.wake_all()
                                              └→ Waker::wake()
                                                  ↑ 唤醒线程 A 上的 task

两条路径在 StateCell 上汇合,但走的是不同的原子操作:

路径 操作 原子指令 持有的锁
Worker A(task) poll(cx.waker()) AtomicWaker::register_by_ref + AtomicU64::load(Acquire)
Worker B(driver) fire(Ok(())) AtomicU64::load(Relaxed) + AtomicU64::store(Release) + AtomicWaker::take Mutex<InnerState>(仅保护 Wheel,不保护 StateCell)

关键观察:Worker B 的 Mutex<InnerState> 保护的是 lock.wheel.poll(now)(遍历侵入式链表、从 slot 移除 entry),但 StateCell::fire() 内部的原子操作完全在此锁的保护范围之外——锁只保证 entry 被正确从 Wheel 移除并拿到 TimerHandle,之后 fire()StateCell 的写入是 lock-free 的。Worker A 的 poll() 全程无锁,只靠原子操作与 Worker B 协调。

这个并发窗口真实存在的原因是:Tokio 的 multi-thread runtime 中,Driver 并不是一个独立的后台线程,而是由当前恰好空闲、需要 park 的那个 worker 线程来执行的。所以在任意时刻:

两者可以同时操作不同StateCell(各自处理各自的 timer),也可以同时操作同一个 StateCell(一个在 poll,另一个在 fire)。后者正是 CAS 要处理的竞争窗口。

用一个最简单的 Tokio 程序就可以创造出这个场景:

// worker_threads >= 2,让 Driver 和 task 跑在不同的 worker 上
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
    // 任务 A:一个即将到期的 sleep,会被 Driver 快速 fire
    let task_a = tokio::spawn(async {
        // poll() 在 worker-1 上注册 Waker 到 StateCell
        tokio::time::sleep(Duration::from_millis(1)).await;
        println!("task A woken");
    });

    // 任务 B:让另一个 worker 保持忙碌,迫使 Driver 跑在 worker-2 上
    let task_b = tokio::spawn(async {
        // 这个 sleep 较长,worker-2 最终会 park,期间执行 Driver 逻辑
        tokio::time::sleep(Duration::from_millis(100)).await;
    });

    // task A 的 sleep(1ms) 几乎立即到期:
    //   worker-1: 正在 poll task A 的 Sleep → StateCell::poll() 注册 Waker
    //   worker-2: park 中 → process() 收割到期 timer → 同一个 StateCell::fire()
    //   ↑ 这就是并发窗口
    let _ = tokio::join!(task_a, task_b);
}

严格来说,这个示例不能保证一定会触发竞争窗口——线程调度由 OS 决定,CAS 也可能一次成功。但它展示了并发窗口存在的架构条件:worker_threads >= 2、有 task 在 poll Sleep 的同时有 worker 在执行 Driver。

为什么 multi_thread 有并发而 current_thread 没有?

上面的并发场景只在 flavor = "multi_thread" 下存在。两种 runtime 的本质差异在于 Driver 的共享方式和 park 机制不同。

current_thread 中,DriverCore 独占持有19

// tokio/src/runtime/scheduler/current_thread/mod.rs: Core 结构体
struct Core {
    tasks: VecDeque<Notified>,
    driver: Option<Driver>,  // ← Driver 归 Core 独占,Option 取出/放回
    // ...
}

唯一的线程 park 时直接从 Core 取出 Driver、执行 park_internalprocess()、用完放回。task 的 Sleep::poll() 和 Driver 的 process() 在同一线程上严格串行StateCell 的 CAS 不会遇到真正的多核竞争。

current_thread(单线程):
  poll task A → poll task B → 无活 → park(driver) → process() → 继续 poll
                                    ↑ 同一线程,所有操作串行

multi_thread 中,Driver 被包在 Arc<Shared> 里,多个 worker 通过 TryLock 竞争20

// tokio/src/runtime/scheduler/multi_thread/park.rs: Shared
struct Shared {
    driver: TryLock<Driver>,  // ← 多 worker 共享,谁抢到谁当 driver
}

pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) -> HadDriver {
    if let Some(mut driver) = self.inner.shared.driver.try_lock() {
        // 抢到了 → 成为 driver 线程,执行 park_internal → process()
        self.inner.park_driver(&mut driver, handle, Some(duration))
    } else {
        // 没抢到 → 用 Condvar 退化等待,不做 driver 工作
        self.inner.park_condvar(Some(duration));
    }
}

TryLock 只保护”谁有资格当 driver”——抢到的 worker 独占 park_internalprocess() 的执行权。但另一个 worker 仍然可以无锁地执行 task 并调用 StateCell::poll(),两者在 StateCell 上形成真正的多核并发。

multi_thread(双 worker):
  worker-0: poll task A → StateCell::poll(waker) → Pending ──→ poll 其他 task
  worker-1: poll task B ──→ 无活 → try_lock(driver) → 抢到!
                               ↓
                             park_internal()
                               ↓
                             process() → StateCell::fire() → waker.wake()
                               ↑
                             同一时刻 worker-0 也在操作 StateCell(poll 其他 task)

两种 runtime 的 Driver 共享模式对比:

  current_thread multi_thread
Driver 持有 Core 独占 Option<Driver> Arc<Shared { TryLock<Driver> }> 共享
Park 机制 直接从 Core 取 try_lock() 竞争,抢不到退化 Condvar
Task poll vs Driver process 同一线程,串行 可能在不同线程,并发
StateCell 并发 不存在(原子操作无实际争用) 存在(CAS 多核竞争)
classDiagram
    class Sleep {
        +deadline: Instant
        +entry: Timer
        +poll(cx) Poll
    }
    class TimerEntry {
        -driver: scheduler~Handle
        -inner: TimerShared
        -deadline: Instant
        +poll_elapsed(cx) Poll
        +reset(deadline)
    }
    class TimerShared {
        +state: StateCell
        +registered_when: AtomicU64
        +pointers: linked_list.Pointers
    }
    class StateCell {
        -state: AtomicU64
        -waker: AtomicWaker
        +poll(waker) Poll
        +fire(result) Option~Waker~
        +mark_pending(not_after) Result
    }
    class Handle {
        +time_source: TimeSource
        +inner: Inner
        +process(clock)
        +process_at_time(tick)
    }
    class Waker {
        +wake()
    }
    class Wheel {
        +poll(now) Option~TimerHandle~
        +insert(entry) Result
        +next_expiration_time() Option~u64~
    }

    Sleep *--> TimerEntry
    TimerEntry *--> TimerShared
    TimerShared *--> StateCell
    TimerEntry --> Handle : 引用(Arc)
    Handle --> Wheel : 持有(Mutex 保护)
    Wheel --> TimerShared : 存储(侵入式链表)
    StateCell ..> Waker : fire() 时取出

这张图的三个关键连接:

  1. StateCell ←→ WakerStateCell::poll() 存入 Waker(来自 cx.waker()),StateCell::fire() 返回 Waker。这是 task 线程与 driver 线程的唯一”数据交换点”——driver 通过 fire() 拿到 waker 后调用 wake(),通知 executor 重新调度 task。

  2. HandleWheelTimerSharedStateCell:Driver 的 process() 是驱动链的起点,它获取锁、驱动 Wheel::poll() 收割到期 slot、对每个到期的 TimerShared 调用 fire() 取出 Waker、释放锁后批量 wake_all()。整个过程 Handle 完全不需要知道具体 task 的存在——它只和 waker 打交道。

  3. TimerEntryHandle:多个 TimerEntry 共享同一个 Handle(通过 scheduler::Handle 中的 Arc),这使得 driver 是全局单例,避免每个 timer 各自维护一套时间轮的重复开销。

三者分工如下:

组件 角色 所属关系
Handle 驱动入口,持有 TimeSource 和被 Mutex 保护的 Wheel 所有 timer 共享一个(Arc-like)
StateCell 每个 timer 的原子状态机;存储 deadline/状态和注册的 Waker 每个 Sleep 一个(在 TimerShared 内)
Waker 来自 executor 的不透明唤醒令牌;wake() 将任务放回就绪队列 每次 poll() 一个(克隆到 AtomicWaker 中)

上文描述了数据关系,下面把实际代码的调用链逐层追踪,看清楚 HandleWheelTimerSharedStateCellWaker 在代码层面是怎么串联起来的。

实际代码调用链

整条链从 park_internal 末尾的 handle.process(clock) 出发16

// tokio/src/runtime/time/mod.rs: park_internal 末尾
// 线程从 park 醒来后,处理所有到期的定时器
handle.process(rt_handle.clock());

第一层:Handle::processprocess_at_time

// tokio/src/runtime/time/mod.rs: Handle::process
pub(crate) fn process(&self, clock: &Clock) {
    // TimeSource 将 Clock::now() 转为驱动启动以来的毫秒 tick
    let now = self.time_source.now(clock);

    // 根据定时器类型分派:
    //   Traditional → self.process_at_time(now)
    //   Alternative → self.process_at_time_alt(...)
    self.process_at_time(now);
}

process() 是公开入口(被 park_internal 调用),它只做两件事:获取当前时间、委托给 process_at_time

第二层:process_at_timeWheel::poll

// tokio/src/runtime/time/mod.rs: Handle::process_at_time
pub(self) fn process_at_time(&self, mut now: u64) {
    let mut waker_list = WakeList::new();
    let mut lock = self.inner.lock();          // ← 获取 Mutex<InnerState>

    // Wheel::poll 循环:收割所有 deadline ≤ now 的 slot
    while let Some(entry) = lock.wheel.poll(now) {
        // ↓ entry 类型是 TimerHandle(NonNull<TimerShared>)
        if let Some(waker) = unsafe { entry.fire(Ok(())) } {
            waker_list.push(waker);            // ← 收集 Waker,延迟唤醒
            // 批次满了就释放锁、批量唤醒、再获取锁
        }
    }

    // 更新下次唤醒时间
    lock.next_wake = lock.wheel.poll_at()...;
    drop(lock);                                // ← 释放锁

    waker_list.wake_all();                     // ← 批量调用 Waker::wake()
}

这里 lock.wheel.poll(now) 返回的 entryTimerHandle——一个 NonNull<TimerShared> 包装。它被从 Wheel 的侵入式链表中取出后,所有权交给了这段循环。

第三层:entry.fire()StateCell::fireOption<Waker>

// tokio/src/runtime/time/entry.rs: TimerHandle::fire
pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> {
    unsafe { self.inner.as_ref().state.fire(completed_state) }
    //        ↑ NonNull<TimerShared>       ↑ StateCell::fire()
}

TimerHandle::fire 是一层薄薄的代理,真正的逻辑在 StateCell::fire15

// tokio/src/runtime/time/entry.rs: StateCell::fire
unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
    // 原子检查:是否已触发(STATE_DEREGISTERED)
    let cur_state = self.state.load(Ordering::Relaxed);
    if cur_state == STATE_DEREGISTERED {
        return None;  // 已触发过,返回 None(避免重复唤醒)
    }

    // 写入结果
    self.result.with_mut(|p| *p = result);
    // 原子标记为已注销
    self.state.store(STATE_DEREGISTERED, Ordering::Release);

    // 取出之前 poll() 时存入的 Waker 并返回
    self.waker.take_waker()
    //     ↑ AtomicWaker::take() → Option<Waker>
}

这是整条链的核心跃迁点:StateCellAtomicU64 状态从任意非终止状态(PendingFire 或 Scheduled)切换到 STATE_DEREGISTERED,同时取出 AtomicWaker 中存储的 Waker。这个 Waker 正是任务首次 poll 时通过 StateCell::poll(waker) 注册进去的——一进一出,任务唤醒的闭环完成。

第四层:回到 process_at_timewaker_list.wake_all()

// tokio/src/runtime/time/mod.rs: 回到 process_at_time
drop(lock);                // 释放 driver 锁
waker_list.wake_all();     // 批量调用所有收集到的 Waker::wake()

WakeList 将收集到的 Waker 批量调用 wake(),通知 executor 将对应 task 放回就绪队列。在释放锁之后再唤醒是刻意设计——如果在持有 Mutex<InnerState> 时调用 wake(),被唤醒的任务可能立即被调度并尝试获取同一把锁,导致死锁。

整条链的数据流图

graph LR
    A["TimerEntry<br/>.driver 字段"] -->|"scheduler::Handle"| B["scheduler::Handle<br/>运行时全貌"]
    B -->|".driver()"| C["driver::Handle<br/>各 driver 聚合"]
    C -->|".time()"| D["time::Handle<br/>Time Driver 专用"]
    D -->|".process(clock)"| E["time::Handle::process()"]
    E -->|"time_source.now(clock)"| F["now: u64"]
    E -->|"委托"| G["time::Handle::process_at_time(now)"]
    G -->|"time::Handle.inner.lock()"| H["time::InnerState<br/>(under Mutex)"]
    H -->|"lock.wheel.poll(now)"| I["time::wheel::Wheel::poll()"]
    I -->|"循环返回"| J["time::entry::TimerHandle"]
    J -->|"entry.fire(Ok(()))"| K["time::entry::TimerHandle::fire()"]
    K -->|"代理"| L["time::entry::StateCell::fire()"]
    L -->|"CAS: store DEREGISTERED"| M["AtomicU64::store<br/>(Release)"]
    L -->|"提取"| N["Option&lt;Waker&gt;"]
    N -->|"push"| O["crate::util::WakeList"]
    O -->|"drop(lock) 后"| P["waker_list.wake_all()"]
    P -->|"调用"| Q["std::task::Waker::wake()"]
    Q -->|"通知"| R["Executor 就绪队列"]

    style A fill:#f3e5f5
    style B fill:#e1f5fe
    style C fill:#fff3e0
    style D fill:#c8e6c9
    style L fill:#c8e6c9
    style Q fill:#f3e5f5
    style R fill:#f3e5f5

TimerEntry.driverscheduler::Handle)到 std::task::Waker::wake(),整条链先经过三层 Handle 导航(scheduler::Handledriver::Handletime::Handle),再进入四个 crate 私有模块(runtime/time/mod.rsruntime/time/wheel/mod.rsruntime/time/entry.rs → StateCell),最终收敛到标准库的 Waker::wake()。每一步都是纯粹的数据流动:time::Handle 持有锁驱动 wheel,Wheel 返回 time::entry::TimerHandleTimerHandle::fire() 触发 time::entry::StateCell 状态迁移,StateCell 吐出 Waker。链上没有任何地方持有 task 的引用或了解 task 的身份——这也是为什么一套 Driver 可以驱动百万级 Sleep 实例的根本原因。

协作方向是单向的:Handle::process() 驱动 Wheel → Wheel 返回到期的 TimerShared → 调用 StateCell::fire() 提取 Waker → 调用 Waker::wake() 通知 executor。

这种解耦设计使得 Time Driver 与 executor 完全无关——driver 只需要知道”有 waker 需要调用”,而不需要知道 waker 背后是哪个 task、哪个 scheduler。StateCell 充当纯桥梁:poll() 接收 Waker,fire() 返回 Waker。而原子状态机(已在前一节详细分析)保证了即使 task 线程和 driver 线程并发访问同一个 StateCell,也能通过 CAS 正确协调,无需额外的锁。

六、最底层:从 park_timeout 到时钟中断

前面说 park_timeout 让线程休眠到最近的 deadline,但它到底是怎么”准时醒来”的?答案在最底层的硬件时钟中断。

Tokio 的 park_timeout 在 Linux 上最终会调用 mio::Poll::poll(events, timeout),后者通过 epoll_wait(timeout_ms) 进入内核。内核内部使用高精度定时器(hrtimer)来实现这个超时21

// fs/eventpoll.c: 2017-2031 (Linux 6.x, ep_poll)
if (!eavail)
    timed_out = !ep_schedule_timeout(to) ||
        !schedule_hrtimeout_range(to, slack,
                                  HRTIMER_MODE_ABS);

epoll_waitint timeout_ms 参数先被转为 timespec64,再转为 ktime_t 绝对到期时间,最后通过 schedule_hrtimeout_range() 注册到内核的 hrtimer 红黑树中:

epoll_wait(epfd, events, maxevents, timeout_ms)
  → do_epoll_wait()
    → ep_poll()
      → schedule_hrtimeout_range(&expires, slack, HRTIMER_MODE_ABS)
        → hrtimer_start_range_ns()    ← 插入 hrtimer 红黑树
        → schedule()                   ← 让出 CPU
sequenceDiagram
    participant Tokio as Tokio Worker
    participant mio as mio / epoll
    participant Kernel as Linux Kernel
    participant hrtimer as hrtimer 子系统
    participant Clock as 时钟事件设备 (APIC/HPET)

    Tokio->>mio: park_timeout(5000ms)
    mio->>Kernel: epoll_wait(timeout=5000ms)
    Kernel->>hrtimer: schedule_hrtimeout_range()
    hrtimer->>hrtimer: enqueue_hrtimer() 插入红黑树
    hrtimer->>Clock: 编程设备为 one-shot,5 秒后触发中断
    Note over Tokio: 线程休眠,CPU 可服务其他任务
    Note over Clock: ... 5 秒 ...
    Clock-->>Kernel: 硬件时钟中断
    Kernel->>hrtimer: hrtimer_interrupt()
    hrtimer->>hrtimer: __hrtimer_run_queues() 遍历红黑树
    hrtimer->>Kernel: hrtimer_wakeup() → wake_up_process()
    Kernel-->>Tokio: epoll_wait 返回
    Tokio->>Tokio: process_at_time() 处理到期定时器

6.1 C 语言的等价实现

Tokio 的 park_timeout 在 Linux 上依赖 epoll_wait 的超时参数。在 C 语言中,有两种不同的方式使用内核定时器,分别对应”主动检查”和”自动回调”两种模型。

timerfd + epoll(主动检查模型)

Tokio 底层走的是这条路径。timerfd 将定时器暴露为文件描述符,epoll 可以像监听网络事件一样监听它:

// 创建一个定时器文件描述符
int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
struct itimerspec spec = {
    .it_value = { .tv_sec = 2, .tv_nsec = 0 }  // 2 秒后触发
};
timerfd_settime(tfd, 0, &spec, NULL);

// 将 timerfd 注册到 epoll
int epfd = epoll_create1(0);
struct epoll_event ev = { .events = EPOLLIN, .data.fd = tfd };
epoll_ctl(epfd, EPOLL_CTL_ADD, tfd, &ev);

// 阻塞直到定时器到期
epoll_wait(epfd, &ev, 1, -1);
uint64_t exp;
read(tfd, &exp, sizeof(exp));  // 消耗到期事件

当定时器到期时,timerfd 变为可读,epoll_wait 返回。这与 Tokio 的 park_timeout 最终调用 epoll_wait(timeout_ms) 是同一个底层机制——只不过 Tokio 把超时时间作为 epoll_wait 的参数传递,而 C 版本显式创建了一个独立的 timerfd。两种方式最终都进入内核的 hrtimer 子系统。

timer_create + SIGEV_THREAD(自动回调模型)

C 标准还提供了另一种方式——POSIX 定时器。设置 sigev_notify = SIGEV_THREAD 后,内核在定时器到期时自动创建一个新线程来运行回调函数:

#include <signal.h>
#include <time.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

void timer_callback(union sigval val) {
    int id = *(int *)val.sival_ptr;
    printf("定时器 %d 到期!线程 ID: %lu\n", id, pthread_self());
}

int main() {
    timer_t timer;
    int id = 42;
    struct sigevent sev = {
        .sigev_notify = SIGEV_THREAD,
        .sigev_value.sival_ptr = &id,
        .sigev_notify_function = timer_callback,
    };
    timer_create(CLOCK_MONOTONIC, &sev, &timer);

    struct itimerspec spec = {
        .it_value = { .tv_sec = 2, .tv_nsec = 0 }
    };
    timer_settime(timer, 0, &spec, NULL);

    printf("主线程继续做其他事...\n");
    pause();
    return 0;
}

这种方式看起来很像”注册回调,自动执行”的理想模型。但在高性能服务器中很少使用——从信号/线程上下文中有太多限制:不能可靠访问线程局部存储、锁的约束更严格、线程频繁创建销毁的开销不可控。因此,实际的高性能 C 程序(如 Nginx、Redis)都走 timerfd + epoll 路径,而不是 SIGEV_THREAD

Tokio 的 park_timeout 本质上更接近 timerfd 模型——它不是靠回调自动执行的,而是把超时时间传给 epoll_wait,到期后线程被内核唤醒,再由 Driver 去时间轮里收割到期定时器。

为什么 Tokio 不走 SIGEV_THREAD 或信号路径?

既然内核提供了 SIGEV_THREAD 这种”自动回调”机制,Tokio 为什么不用?原因有三。

1. 锁竞争爆炸。 如果几十个定时器同时到期,每个都起一个线程去跑回调,这些线程会同时争 Wheel 的锁和任务的运行队列锁。而 Tokio 现在的设计是只有一个 Driver 线程在 park/unpark 循环里处理定时器——处理完一把锁释放、批量唤醒,零锁争用。

2. 没有批处理。 epoll_wait 返回后,Driver 在 process_at_time 里一次性收割所有到期定时器,把 waker 收集到 WakeList 里批量调用(参见第五节)。如果每个定时器单独起一个线程或信号,就失去了批处理的机会——每个回调都得单独走一遍上下文切换。

3. 信号的安全黑洞与合作式调度冲突。 信号处理函数只能调用 async-signal-safe 的函数,不能锁 mutex、不能分配内存,几乎什么也干不了。SIGEV_THREAD 虽然避开了信号限制,但它的抢占式特性会破坏 Tokio 的合作式调度模型。而现在的 epoll_wait → Driver 处理 → Waker::wake() → 任务入队 → 调度器 poll,每一步都在运行时掌控之中。

本质上,Tokio 已经有了回调机制——Waker。问题不是”怎么通知”,而是”怎么高效地通知”。epoll_wait 不是轮询,而是被内核挂起;醒来后批量收割时间轮,本质是把 n 个独立的”自动回调”合并成 1 个可预测的批处理循环。

6.2 红黑树 vs 哈希时间轮

有趣的是,内核的 hrtimer 和 Tokio 用了不同的数据结构。Tokio 的 time driver 使用哈希时间轮(本文第三节),而 Linux 的 hrtimer 使用红黑树(rbtree)来组织定时器22

// include/linux/hrtimer_defs.h: 27-35
struct hrtimer_clock_base {
    struct hrtimer_cpu_base    *cpu_base;
    clockid_t                  clockid;
    unsigned int               seq;
    // 按到期时间排序的红黑树根节点
    struct timerqueue_linked_head active;
    ktime_t                    (*get_time)(void);
    ktime_t                    offset;
};

enqueue_hrtimer() 将定时器插入红黑树,时间复杂度 O(log n)23

// kernel/time/hrtimer.c: 1096-1104
/*
 * enqueue_hrtimer - internal function to (re)start a timer
 *
 * The timer is inserted in expiry order. Insertion into the
 * red black tree is O(log(n)).
 */
static bool enqueue_hrtimer(struct hrtimer *timer,
                            struct hrtimer_clock_base *base,
                            enum hrtimer_mode mode, bool was_armed)

这并不是说哪种数据结构更优——它们只是面向不同的约束。内核的 hrtimer 需要纳秒级精度和动态的到期时间分布,红黑树的 O(log n) 插入更为稳妥。Tokio 的哈希时间轮只需要毫秒级精度,但需要管理海量定时器(成千上万个 Sleep 并发),O(1) 插入和批量降级更适合这种场景。

有意思的是,Linux 内核也有一套低精度定时器(timer wheel),代码路径在 kernel/time/timer.c,它用的正是和 Tokio 类似的层级哈希轮结构——8 到 9 层,每层 64 个 bucket(LVL_SIZE = 64LVL_DEPTH = 8/9),粒度和覆盖范围的表24

// kernel/time/timer.c: 104-115 (HZ=1000 时)
//
// HZ 1000 steps
// Level Offset  Granularity            Range
//  0      0         1 ms                0 ms -         63 ms
//  1     64         8 ms               64 ms -        511 ms
//  2    128        64 ms              512 ms -       4095 ms
//  3    192       512 ms             4096 ms -      32767 ms
//  4    256      4096 ms (~4s)      32768 ms -     262143 ms
//  5    320     32768 ms (~32s)    262144 ms -    2097151 ms
//  6    384    262144 ms (~4m)    2097152 ms -   16777215 ms
//  7    448   2097152 ms (~34m)  16777216 ms -  134217727 ms
//  8    512  16777216 ms (~4h)  134217728 ms - 1073741822 ms

对比 Tokio 的 6 层 64 slot 设计,两者的核心思想同源——都源自哈希时间轮的经典设计。

6.3 从时钟中断到进程唤醒

当硬件时钟事件设备(Local APIC Timer 或 HPET)在 deadline 到达时产生中断,内核进入 hrtimer_interrupt()。这个函数做了几件关键的事23

  1. 更新时间基准hrtimer_update_base() 读取硬件计数器和系统时间
  2. 处理到期定时器__hrtimer_run_queues() 遍历红黑树中所有到期时间 ≤ now 的定时器
  3. 调用定时器回调:对于 sleep 类定时器,回调是 hrtimer_wakeup()

hrtimer_wakeup 的实现非常直接——找到等待中的进程,把它叫醒23

// kernel/time/hrtimer.c: 2184-2193
static enum hrtimer_restart hrtimer_wakeup(struct hrtimer *timer)
{
    struct hrtimer_sleeper *t =
        container_of(timer, struct hrtimer_sleeper, timer);
    struct task_struct *task = t->task;

    t->task = NULL;
    if (task)
        wake_up_process(task);

    return HRTIMER_NORESTART;
}

wake_up_process() 将进程状态设为 TASK_RUNNING 并放入就绪队列。随后内核的调度器在合适的时机把它调度回 CPU。Tokio 的 worker 线程从 epoll_wait 返回,继续执行 process_at_time(),完成定时器的处理。

6.4 关键细节:这不是周期性的 tick

需要注意,现代 Linux 在 NOHZ(dynticks)模式下并不是靠周期性的时钟中断来驱动定时器的,而是把时钟事件设备编程为一次性(one-shot)触发hrtimer_interrupt() 在处理完当前到期的定时器后,会重新编程时钟设备,让它在下一次最早到期的 deadline 时再产生中断23

这意味着如果系统上没有定时器在等待(或者最近的定时器在 5 秒后),CPU 可以进入深度休眠状态,期间不产生任何时钟中断。这也是为什么 Tokio 的 park_timeout 在”没有定时器且没有 limit”时可以直接无限期 park——内核不会为它浪费任何 CPU 周期。

这条从 tokio::time::sleep 到硬件时钟中断的链路也就清楚了:

tokio::time::sleep(5s).await
    → Sleep::poll() 返回 Pending,注册到哈希时间轮
    → Driver::park_timeout(5s)
    → mio → epoll_wait(timeout=5000ms)
    → 内核 schedule_hrtimeout_range() → 插入红黑树
    → 编程时钟设备 one-shot 5 秒后触发
    → [CPU 休眠或服务其他任务]
    → 5 秒后时钟中断 → hrtimer_interrupt()
    → __hrtimer_run_queues() → hrtimer_wakeup() → wake_up_process()
    → epoll_wait 返回 → 线程醒来
    → process_at_time() → fire → wake_all()
    → Sleep::poll() 返回 Ready

6.5 定时器管理的存放位置:用户态 vs 内核态

如果把这几种方式放在一起对比,一个更本质的区别浮现出来——定时器到底存放在哪一层管理:

方案 定时器存储位置 数据结构 操作复杂度 内核中管理的定时器数量
C timerfd 内核 hrtimer 红黑树 O(log n) n 个
C timer_create(SIGEV_THREAD) 内核 hrtimer 红黑树 O(log n) n 个
Tokio 时间轮 用户态哈希时间轮 多层时间轮 O(1) + 1 × O(log 1) 1 个

Tokio 在用户态维护时间轮,只向内核注册一个”最近 deadline”的唤醒点。这意味着几万个定时器的插入和删除都在用户态完成,O(1),没有系统调用;内核只需要管理 1 个定时器(而不是 n 个),红黑树的 O(log 1) ≈ O(1)。精度降低到毫秒级,但吞吐量大幅提升。

graph LR
    subgraph "C timerfd(n 个定时器全部进内核)"
        A1["用户态 app"] -->|"timerfd_settime × n"| K1["内核 hrtimer<br/>红黑树<br/>O(log n) 管理 n 个"]
    end

    subgraph "Tokio(1 个定时器进内核)"
        A2["用户态 app"] -->|"O(1) 插入"| W["用户态哈希时间轮<br/>6 层 × 64 slot"]
        W -->|"只注册最早 deadline"| K2["内核 hrtimer<br/>红黑树<br/>O(log 1) 管理 1 个"]
    end

这正是 Varghese & Lauck 论文12思想在工程中的体现:用用户态的时间轮 + 少量内核辅助,替代全部在内核红黑树中管理。Tokio 不是在”绕过”内核机制,而是在它之上加了一层更高效的用户态调度。

6.6 epoll_wait 在这条链路中的真实角色

前面说了那么多”内核 hrtimer”和”epoll_wait”,容易产生一个误解:epoll_wait 在管理 tokio 的定时器吗?

答案是 epoll_wait 完全不知道 tokio 时间轮的存在。epoll_wait 的 timeout 参数只是一个”线程睡眠闹钟”。

本质上,epoll_wait 内部是两个唤醒源的 OR 组合:

epoll_wait(epfd, events, maxevents, timeout)
                ↑                       ↑
           I/O 事件就绪              hrtimer 到期
          (网卡中断 →               (时钟中断 →
           协议栈 → epoll)            LAPIC → hrtimer)

任何一个条件满足,epoll_wait 就返回。内核里大致是这样的循环25

// 内核 ep_poll() 简化伪代码
int epoll_wait(..., int timeout_ms) {
    if (timeout_ms > 0)
        schedule_hrtimeout_range(timeout_ms);  // 设一个 hrtimer

    while (1) {
        if (events_ready)  return;        // I/O 事件来了
        if (timeout_expired) return 0;    // hrtimer 到了
        schedule();  // 让出 CPU,等任一条件满足
    }
}

tokio 利用的正是这个”谁先到就醒谁”的 OR 语义——I/O 事件先到就提前回来顺便收割 timer,hrtimer 先到就准时处理到期 timer 并回到 epoll 继续等 I/O。

tokio 时间轮:管理 1,000,000 个 Sleep
    ↓ 算出最早 deadline = now + 5s
    ↓ 传给 epoll_wait 作为 timeout
epoll_wait:设 1 个内核 hrtimer,5s 后叫醒线程
    ↓ epoll_wait 不知道 tokio 的时间轮
    ↓ epoll_wait 不知道 StateCell
    ↓ epoll_wait 不知道 Waker
    ↓ 它只是一个准确的线程睡眠定时器
    ↓ 5s 后
epoll_wait 返回 → 回到 tokio Driver
    ↓
tokio process_at_time():用自己维护的 elapsed 推进时间轮
    ↓ 收割到期 timer
    ↓ fire → waker.wake()

如果把 epoll_wait(timeout) 换成 nanosleep(timeout) + epoll_wait(-1),效果等价25。Tokio 用 epoll_wait(timeout) 只是因为方便——一个系统调用同时等两件事(I/O 事件和定时器到期),比分开用 nanosleep + epoll_wait 少一次上下文切换。

所以 epoll_wait 的角色是:tokio 的”定时闹钟”,不是 tokio 的”定时器驱动”。tokio 的定时器驱动完全在用户态完成——elapsed 自维护、时间轮自推进、StateCell 自管理。epoll_wait 只是”到点把线程叫醒”的门铃。

这个角色区分是整个体系的精妙之处:内核做它最擅长的事——准确到毫秒地叫醒线程;tokio 做它最擅长的事——在海量定时器中高效找出哪些已经到期

七、TimeSource:时间与 tick 的桥梁

TimerEntry 使用 Instant 作为对外的时间抽象,而 Wheel 内部使用 u64 毫秒级 tick。TimeSource 负责两者之间的转换26

// tokio/src/runtime/time/source.rs: 6-38
pub(crate) struct TimeSource {
    start_time: Instant,
}

impl TimeSource {
    pub(crate) fn deadline_to_tick(&self, t: Instant) -> u64 {
        // 向上取整到最近的毫秒
        self.instant_to_tick(t + Duration::from_nanos(999_999))
    }

    pub(crate) fn instant_to_tick(&self, t: Instant) -> u64 {
        let dur: Duration = t.saturating_duration_since(self.start_time);
        let ms = dur.as_millis().try_into()
            .unwrap_or(MAX_SAFE_MILLIS_DURATION);
        ms.min(MAX_SAFE_MILLIS_DURATION)
    }

    pub(crate) fn now(&self, clock: &Clock) -> u64 {
        self.instant_to_tick(clock.now())
    }
}

展开来看两个转换函数。instant_to_tick 是核心26

pub(crate) fn instant_to_tick(&self, t: Instant) -> u64 {
    let dur: Duration = t.saturating_duration_since(self.start_time);
    //                    ↑ 核心公式:t - start_time
    let ms = dur.as_millis().try_into()...;  // u128 → u64
    ms.min(MAX_SAFE_MILLIS_DURATION)
}

deadline_to_tick 在它之上加了向上取整:

pub(crate) fn deadline_to_tick(&self, t: Instant) -> u64 {
    self.instant_to_tick(t + Duration::from_nanos(999_999))
    //                    ↑ 加 0.999999ms,任何不足 1ms 都被进位
}

关系是:Wheel.elapsedStateCell.state 都来自 TimeSource 的同一个参考原点 start_time

reset(deadline) 为例,tick 是通过 deadline_to_tick(deadline) 算出的:

tick = deadline_to_tick(deadline)
     = instant_to_tick(deadline + 999_999ns)
     = ((deadline + 999_999ns) - start_time).as_millis()
     = (deadline - start_time + 999_999ns).as_millis()

如果此时 now - start_time = 5000ms,而 deadline = now + 5min

tick = ((now + 5min + 999_999ns) - start_time).as_millis()
     = ((start_time + 5000 + 300000 + 0.999999 - start_time)).as_millis()
     = (300000 + 5000 + 0.999999).as_millis()
     = 305,000  // as_millis() 丢纳米,等价于向下取整

elapsedstate 的差值就是真正的剩余时间。因为两者都是相对于 start_time 的毫秒偏移量:

state - elapsed = (deadline - start_time) - (now - start_time)
                = deadline - now
                = 剩余等待时间(毫秒)

三个关键数值的关系可以总结为:

数值 保存在哪 本质 用途
Wheel.elapsed Wheel 坐标原点偏移量:当前 now - start_time 的毫秒值 推进时间轮(elapsed += delta),判定 timer 是否到期(state <= elapsed
registered_when TimerShared slot 定位缓存:上次插入时的 state 快照,持 Relaxed 顺序 算 slot 索引(slot_for)、从 slot 链表移除(remove
StateCell.state StateCell 真实到期时间deadline - start_time,更新需 CAS(AcqRel mark_pending() 中做最终到期判断(state > not_after ?)

registered_whenstate 的只读缓存——两者绝大多数时候相同,唯一的例外是 extend_expiration 无锁延后了 state 但未更新 registered_when 的间隙。此时 slot 位置暂时”不准”,但 Driver 在收割时通过 mark_pending 返回的 Err 检测到这个偏差,用 sync_when()state 同步回 registered_when 并重插入正确 slot。

所以决定”到没到期”的只有一行判断——state > not_afterregistered_when 只是加速 slot 定位的缓存,elapsed 只是标识”现在几点”的指针。state 是唯一的 source of truth

这就验证了第 3.2 节例子中的核心假设:elapsed=0 时,tick=300000 等价于 deadline - start_time = 300000ms。如果 elapsed=5000(已经运行了 5 秒),tick 就是 305000——state 永远比 elapsed 大一个固定差值。

一个具体的数值对比能说清”绝对 vs 相对”的区别。假设 elapsed=10000(runtime 已运行 10 秒),此时创建 sleep(500ms)

sleep(500ms).await
  → deadline = now + 500ms
  → deadline_to_tick(deadline)
     = (deadline + 999_999ns - start_time).as_millis()
     = (now + 500ms - start_time).as_millis()
     = (10000 + 500).as_millis()
     = 10500                          ← 绝对毫秒偏移量

state - elapsed = 10500 - 10000 = 500  ← 差值才等于注册时长

state绝对毫秒偏移量,不是相对值。如果存相对值(state=500),那 state - elapsed = 500 - 10000 就会包装溢出。elapsed ≥ state 的到期判断也变成一句加法——选绝对值省了这条指令,还保持了 u64 减法的一致性。

deadline_to_tick999_999ns 加法是向上取整的关键:假设用户调用了 sleep(Duration::from_micros(1))deadlinestart_time 只差 1μs,直接 as_millis() 得到 0,timer 立刻到期,根本不等待。加上 999_999nsas_millis() 得到 1——最小等待 1ms。这正是 1ms 精度约束在代码中的直接体现。

MAX_SAFE_MILLIS_DURATIONu64::MAX - 1,约为 5.84 亿年,远超实际需要。

u64 溢出保护

你可能会担心 elapsedstate 都是 u64 毫秒值——如果运行足够久会溢出吗?

三层保护防止溢出问题:

第一层:STATE_DEREGISTEREDSTATE_PENDING_FIRE 占用了 u64 的顶值StateCell.stateu64::MAX 被用作 STATE_DEREGISTERED(已注销),u64::MAX - 1 被用作 STATE_PENDING_FIRE(待触发)。所以有效的最大 tick 是 u64::MAX - 2。如果 TimeSource::instant_to_tick 算出来的值超过了这个范围,会被 ms.min(MAX_SAFE_MILLIS_DURATION) 钳住。

第二层:时间轮的 MAX_DURATION 限制。6 层 × 64 slot 的时间轮能表达的最大范围是 2^36 - 1 ≈ 2.18 年MAX_DURATION)。超过这个范围的 timer 会被自动放入顶层(Level 5)的合适 slot,不会因为到期时间远大于 wheel 范围而出错——这是哈希时间轮论文描述的”wrap-around”处理12

第三层:纯数值比较Wheel::insert()when <= self.elapsed 是直接的 u64 比较。即使 elapsedstate 都接近 u64::MAX,差值 state - elapsed(用于计算 park_timeout)是正确的无符号结果——因为 Rust 的 saturating_sub 保证了安全性。

实际上,elapsed 从 0 以毫秒为单位递增到溢出需要约 5.84 亿年。在那之前 runtime 早就被重启无数次了。MAX_SAFE_MILLIS_DURATION1.8 × 10^14 天,也远超任何实际运行时长。

八、把整条链路串起来

现在可以把 tokio::time::sleep 的完整执行路径画出来:

tokio::time::sleep(Duration::from_secs(5))
    │
    ├─ 创建 Sleep { entry: TimerEntry { deadline: now + 5s, registered: false } }
    │
    ├─ 首次 poll:
    │   ├─ TimerEntry::reset(deadline, true)
    │   │   ├─ 初始化 TimerShared(如果还没有)
    │   │   ├─ 将 deadline 转为 tick → 插入 Wheel(哈希时间轮)
    │   │   └─ 设置 registered = true
    │   ├─ StateCell::poll(cx.waker())
    │   │   ├─ 注册 waker 到 AtomicWaker
    │   │   └─ 加载 state → 还不是 STATE_DEREGISTERED
    │   └─ 返回 Poll::Pending
    │
    ├─ Runtime::park_internal → time::Driver::park_internal():
    │   ├─ Wheel::next_expiration_time() → Some(5000)
    │   ├─ 计算 duration = 5000ms
    │   ├─ self.park_thread_timeout(rt_handle, 5000ms)
    │   │   └─ self.park.park_timeout(handle, 5000ms)
    │   │       └─ IoStack::park_timeout(handle, 5000ms)
    │   │           └─ ProcessDriver::park_timeout(handle, 5000ms)
    │   │               └─ SignalDriver::park_timeout(handle, 5000ms)  [Unix, 可选]
    │   │                   └─ io::Driver::park_timeout(handle, 5000ms)
    │   │                       └─ io::Driver::turn(handle, Some(5000ms))
    │   │                           └─ self.poll.poll(events, Some(5000ms))
    │   │                               └─ mio::Poll::poll(events, 5000ms)
    │   │                                   └─ epoll_wait(epfd, events, maxevents, 5000)
    │   │                                       └─ 内核 hrtimer → 线程休眠 5 秒
    │
    ├─ 5 秒后 OS 唤醒线程:
    │   ├─ Wheel::poll(now) → 找到到期的 entry
    │   ├─ entry.fire(Ok(()))
    │   │   ├─ result ← Ok(())
    │   │   ├─ state ← STATE_DEREGISTERED
    │   │   └─ 取出 waker
    │   └─ waker_list.wake_all()  ← 把任务放回调度队列
    │
    └─ 任务被再次 poll:
        ├─ StateCell::read_state()
        │   └─ cur_state == STATE_DEREGISTERED → 读取 result = Ok(())
        └─ 返回 Poll::Ready(())
sequenceDiagram
    participant User as User Task
    participant Sleep as Sleep Future
    participant Entry as TimerEntry
    participant Cell as StateCell
    participant Wheel as Hashed Wheel
    participant Driver as Time Driver
    participant OS as OS

    User->>Sleep: .await → poll(cx)
    Sleep->>Entry: poll_elapsed(cx)
    Entry->>Wheel: reset(deadline) → insert to Wheel
    Entry->>Cell: poll(waker) → register waker
    Cell-->>Entry: Poll::Pending
    Entry-->>Sleep: Poll::Pending
    Sleep-->>User: Poll::Pending

    Note over User: Worker 线程继续服务其他任务

    Driver->>Wheel: next_expiration_time() → 5000ms
    Driver->>OS: park_timeout(5000ms)
    Note over OS: ... 5 秒 ...
    OS-->>Driver: 唤醒

    Driver->>Wheel: poll(now) → 到期 entry
    Driver->>Cell: fire(Ok(()))
    Cell-->>Driver: Some(waker)
    Driver->>Driver: waker.wake()

    Note over User: 任务被重新调度
    User->>Sleep: poll(cx)
    Sleep->>Cell: read_state()
    Cell-->>Sleep: Poll::Ready(Ok(()))
    Sleep-->>User: Poll::Ready(())

九、取消、重置与交互细节

Sleep 的设计还支持两个重要的交互场景:

取消:直接 drop 掉 Sleep 实例即可。TimerEntryPinnedDrop 实现会调用 cancel(),将定时器从时间轮中移除4。没有额外的清理工作。

重置:可以通过 Pin<&mut Sleep>::reset(new_deadline) 来改变 Sleep 的到期时间,而无需创建新的 Future2。内部实现会先尝试原子地延后到期时间(extend_expiration),如果 CAS 成功则无需操作时间轮;如果提前了到期时间,则会重新走 reregister 路径。

这种”乐观延后”的优化利用了定时器一个常见的使用模式——超时通常在操作开始时设置,然后在操作结束前被取消。延后触发时,旧的 slot 位置依然是”安全的”(不会过早触发),所以可以用 CAS 无锁更新。

总结

tokio::time::sleep 的实现体现了异步系统设计的核心分层:

  1. 接口层sleep() / Sleep):对用户暴露一个 .await 即可等待的 Future
  2. 运行时层TimerEntry / TimerShared / StateCell):用一个原子状态机管理定时器的生命周期,协调用户端和驱动端的并发访问
  3. 数据结构层Wheel):用 6 层哈希时间轮实现 O(1) 插入和高效的到期检测
  4. 驱动层Driver::park_internal):通过操作系统的定时唤醒(park_timeout)驱动整个机制运转

最终的效果是:一个单线程可以同时管理成千上万个定时器,线程只会在最近的 deadline 时休眠,期间 CPU 可以服务其他任务或进入省电状态tokio::time::sleep 的高效并发能力不是来自魔法,而是来自一条从 Future 到 OS 的精心设计的链路。

References

  1. 之前的文章:《Rust async/await 的底层契约:从 Future::poll 到 Tokio 运行时》,2026-04-30。详细阐述了 Future::pollContextWaker 组成的核心协议。 

  2. Tokio 源码,Sleep 结构定义,tokio/src/time/sleep.rs。包含 Sleep 的完整 Future 实现,sleep()sleep_until() 的构造逻辑。  2

  3. Tokio 源码,Timer enum 定义,tokio/src/runtime/mod.rsTimerTraditional(TimerEntry)Alternative(time_alt::Timer) 的枚举封装。 

  4. Tokio 源码,TimerEntry 结构定义,tokio/src/runtime/time/entry.rs。包含 TimerEntry 的关键方法:newresetpoll_elapsedcancel。  2

  5. Tokio 源码,TimerShared 结构定义,tokio/src/runtime/time/entry.rs。前端与驱动端共享的状态,包含侵入式链表指针、registered_whenStateCell。 

  6. Tokio 源码,TimerEntry::poll_elapsed 方法,tokio/src/runtime/time/entry.rs。首次 poll 时注册到时间轮,后续 poll 时检查状态。 

  7. Tokio 源码,TimerEntry::reset 方法,tokio/src/runtime/time/entry.rsreset() 先尝试 extend_expiration 乐观路径,失败后通过 inner.into()&TimerShared 转为 NonNull<TimerShared> 并调用 Handle::reregister()。 

  8. Tokio 源码,Handle::reregister 方法,tokio/src/runtime/time/mod.rs。接收 NonNull<TimerShared>,通过 entry.as_ref().handle() 创建 TimerHandle,调用 lock.wheel.insert(entry) 重新插入时间轮。 

  9. Tokio 源码,TimerHandle 结构定义和 TimerShared::handle() 方法,tokio/src/runtime/time/entry.rsTimerHandleNonNull<TimerShared> 的包装,不拥有数据,由 unsafe 契约确保安全使用。 

  10. Tokio 源码,Wheel::insertLevel::add_entrytokio/src/runtime/time/wheel/mod.rs 以及 tokio/src/runtime/time/wheel/level.rsWheel::insert() 计算层级后调用 Level::add_entry(),后者将 TimerHandle push 到对应 slot 的侵入式链表中。 

  11. Tokio 文档,sleep() 的 panic 说明,tokio/src/time/sleep.rs。解释了为什么 rt.block_on(sleep(...)) 会 panic 而 rt.block_on(async {sleep(...).await}) 不会。 

  12. George Varghese 和 Tony Lauck,《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility》,1996。论文链接:Hashed and Hierarchical Timing Wheels。Tokio 的哈希时间轮数据结构直接引用了此论文的设计。  2 3

  13. Tokio 源码,Wheel 结构定义与文档,tokio/src/runtime/time/wheel/mod.rs。6 层、每层 64 slot 的哈希时间轮实现,以及 NUM_LEVELSMAX_DURATION 常量。  2 3

  14. Tokio 源码,LevelEntryList 定义,tokio/src/runtime/time/wheel/level.rsLevel 包含 occupied 位图和 slot: [EntryList; 64]EntryListLinkedList<TimerShared, TimerShared>,定义在 tokio/src/runtime/time/entry.rs。TimerShared 通过侵入式链表的 pointers 字段直接链接在 slot 的链表中。 

  15. Tokio 源码,StateCell 结构及其方法,tokio/src/runtime/time/entry.rs。包含 pollmark_pending(CAS 循环)、fireset_expirationextend_expiration 等原子操作。  2 3 4 5

  16. Tokio 源码,Driver::park_internalHandle::process_at_timetokio/src/runtime/time/mod.rs。运行时驱动定时器的核心逻辑:计算 next_wake、带超时 park、处理到期定时器、批量唤醒。  2 3 4 5

  17. 关于 per-timer StateCell 的权衡分析:每个 timer 独立持有 AtomicU64,Driver 遍历 slot 时逐 timer CAS。对比替代方案(per-slot Mutex),CAS 方案在零锁争用和 O(N) 线性代价上占据优势,且 slot 的 1ms 宽度提供了天然的保护层,使遍历时间(~10ns/timer)不会影响精度。此分析基于 x86-64 上 lock cmpxchgq 指令的理论延迟约 10-20ns(无 cache miss 时),以及 level-0 slot 宽度 1ms 的事实。 

  18. Tokio 源码,runtime::driver::Driver::newHandle 结构定义,tokio/src/runtime/driver.rs 以及 tokio/src/runtime/driver.rs。Time Driver 和 I/O Driver 分别独立初始化,通过 create_io_stackcreate_time_driver 堆叠。Handle 包含 iotimesignal 三个子句柄,通过 Arc 分发给所有 worker 线程。 

  19. Tokio 源码,current_thread 调度器的 Core 结构,tokio/src/runtime/scheduler/current_thread/mod.rs。Driver 以 Option<Driver> 形式存储,park 时通过 take() 取出。 

  20. Tokio 源码,multi_thread 调度器的 Shared 结构和 Parker::park_timeout 方法,tokio/src/runtime/scheduler/multi_thread/park.rs(Shared)及 tokio/src/runtime/scheduler/multi_thread/park.rs(park_timeout)。TryLock<Driver> 保证同一时刻只有一个 worker 能持有 Driver,但其他 worker 仍可无锁执行 task。 

  21. Linux 内核源码,ep_pollschedule_hrtimeout_range 调用,fs/eventpoll.cepoll_waittimeout_ms 参数经 ep_timeout_to_timespec() 转为 timespec64 绝对时间后,由 schedule_hrtimeout_range() 注册到 hrtimer 红黑树,实现纳秒精度的超时唤醒。 

  22. Linux 内核源码,hrtimer_clock_base 结构定义,include/linux/hrtimer_defs.h。hrtimer 使用红黑树(timerqueue_linked_head)而非哈希时间轮来组织定时器。 

  23. Linux 内核源码,enqueue_hrtimerhrtimer_wakeuphrtimer_interruptkernel/time/hrtimer.cenqueue_hrtimer()(行 1096-1104)将定时器插入红黑树;hrtimer_wakeup()(行 2184-2193)在定时器触发时唤醒进程;hrtimer_interrupt()(行 2083-2114)是时钟中断的处理入口。  2 3 4

  24. Linux 内核源码,timer wheel 实现,kernel/time/timer.c。行 64-150 的注释详细描述了低精度定时器的层级哈希轮设计:8-9 层、每层 64 bucket(LVL_SIZE=64LVL_DEPTH=8/9),与 Tokio 的实现思想同源。 

  25. epoll_wait(timeout) 在本场景中等价于 nanosleep(timeout) + epoll_wait(-1)——都是”先睡 timeout 时间,醒来后再 epoll 检查 I/O 事件”。Tokio 选前者只是因为一个系统调用比两个高效,功能上没有区别。  2

  26. Tokio 源码,TimeSource 实现,tokio/src/runtime/time/source.rs。负责 Instantu64 tick 之间的转换,向上取整到毫秒精度。  2