从 tokio::time::sleep 看异步 Timer 的实现:一次从 Future::poll 到哈希时间轮的源码之旅

tokio::time::sleep 看起来和 thread::sleep 只差一个 .await,但背后连接的是一整套哈希时间轮、原子状态机和操作系统的定时唤醒机制。这篇文章从源码出发,把这条链路拆开来看。

引言:两个 sleep 的对比

在 Rust 里让程序”等一下”有两种常见方式:

// 方式一:标准库的阻塞 sleep
std::thread::sleep(Duration::from_secs(5));

// 方式二:Tokio 的异步 sleep
tokio::time::sleep(Duration::from_secs(5)).await;

从表面看,两者似乎差不多——都是在某个地方停了 5 秒。但它们在运行时的行为有本质区别。用一个简单的对比实验就能看出来。

如果把下面三个场景跑一遍:

场景 任务1 任务2 总耗时
原生线程(2 个线程) 同步 sleep 5s 同步 sleep 2s 5s
Tokio 单线程 + 同步阻塞 thread::sleep 5s tokio::time::sleep 2s 7s
Tokio 单线程 + 全异步 tokio::time::sleep 5s tokio::time::sleep 2s 5s

先看场景一——在 async 任务里混入同步阻塞的版本:

// tokio_block_bad.rs
use tokio::time;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let start = std::time::Instant::now();

    let task1 = tokio::spawn(async move {
        println!("[任务1] 开始阻塞, 时间: {:?}", start.elapsed());
        std::thread::sleep(Duration::from_secs(5));
        println!("[任务1] 结束阻塞, 时间: {:?}", start.elapsed());
    });

    let task2 = tokio::spawn(async move {
        println!("[任务2] 开始异步等待, 时间: {:?}", start.elapsed());
        time::sleep(Duration::from_secs(2)).await;
        println!("[任务2] 结束异步等待, 时间: {:?}", start.elapsed());
    });

    let _ = tokio::join!(task1, task2);
}

运行结果:

[任务1] 开始阻塞, 时间: 32.1µs
... 卡住 5 秒,task2 根本没机会开始 ...
[任务1] 结束阻塞, 时间: 5.001s
[任务2] 开始异步等待, 时间: 5.001s
[任务2] 结束异步等待, 时间: 7.002s

task2 被 task1 的同步阻塞卡了整整 5 秒,总耗时 7 秒。

再看场景二——两个任务都用异步 sleep:

// tokio_async_good.rs
use tokio::time;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let start = std::time::Instant::now();

    let task1 = tokio::spawn(async move {
        println!("[任务1] 开始异步等待5秒, 时间: {:?}", start.elapsed());
        time::sleep(Duration::from_secs(5)).await;
        println!("[任务1] 结束异步等待, 时间: {:?}", start.elapsed());
    });

    let task2 = tokio::spawn(async move {
        println!("[任务2] 开始异步等待2秒, 时间: {:?}", start.elapsed());
        time::sleep(Duration::from_secs(2)).await;
        println!("[任务2] 结束异步等待, 时间: {:?}", start.elapsed());
    });

    let _ = tokio::join!(task1, task2);
}

运行结果:

[任务1] 开始异步等待5秒, 时间: 41.5µs
[任务2] 开始异步等待2秒, 时间: 43.2µs
[任务2] 结束异步等待, 时间: 2.001s
[任务1] 结束异步等待, 时间: 5.002s

总耗时 5 秒,两个任务在单线程上并发执行,互不阻塞。

场景一的 7 秒暴露了问题:当 task1 在 Tokio 的单线程 runtime 里直接调用 thread::sleep,整个工作线程都被阻塞,task2 根本得不到执行机会。而场景二的两个异步 sleep 却在单线程上实现了并发——task2 在 2 秒后准时完成。

关键不在于”等待”本身,而在于用什么方式等待thread::sleep 把线程卡住;tokio::time::sleep 只让任务挂起,线程依然可以服务其他任务。

这篇文章的目的就是拆开 tokio::time::sleep 的内部实现,看看它到底是怎么做到的。

在进入细节之前,可以先回顾一下异步系统的基础契约。在之前的文章《Rust async/await 的底层契约:从 Future::poll 到 Tokio 运行时》1中,我们梳理了 Future::pollContextWaker 组成的核心协议:任务被 poll,如果未就绪就返回 Poll::Pending 并保存 Waker,事件就绪后通过 wake() 通知运行时重新调度。sleep 正是这套协议的一个具体应用——它用”时间到达”作为就绪条件。

概念先行:从伪代码理解核心思路

在深入源码之前,先建立一个直觉。tokio::time::sleep 的秘密可以用一句话概括:它不会让线程去空等,而是注册一个”闹钟”后立刻把线程还给调度器,到时间了再由闹钟叫醒它。

具体来说,下面这行代码实际触发了 6 个步骤:

tokio::time::sleep(Duration::from_secs(5)).await;
sequenceDiagram
    participant Task as 你的任务
    participant Exec as Tokio 调度器
    participant Timer as Tokio 定时器(全局)

    Task->>Timer: 1. 注册:"5秒后唤醒我"
    Task->>Exec: 2. 返回 Poll::Pending
    Exec->>Exec: 3. 切换去执行其他任务
    Note over Timer: 时钟滴答滴答...
    Timer->>Task: 4. 5秒到了,调用 Waker.wake()
    Task->>Exec: 5. 重新排队等待执行
    Exec->>Task: 6. 再次 poll,返回 Poll::Ready

如果把这个过程写成极简的伪代码,大致是这样的:

// 简化的 Sleep Future(概念示意)
struct Sleep {
    deadline: Instant,
    registered: bool,
}

impl Future for Sleep {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if Instant::now() >= self.deadline {
            // 时间到了,完成
            return Poll::Ready(());
        }

        if !self.registered {
            // 时间没到:把 waker 注册到全局定时器
            // 定时器会在 deadline 时调用 waker.wake()
            let waker = cx.waker().clone();
            TOKIO_TIMER.insert(self.deadline, waker);
            self.registered = true;
        }

        // 关键:返回 Pending,交出执行权
        Poll::Pending
    }
}

核心行为只有三个:

  1. 首次 poll:发现时间没到 → 把 Waker(闹钟)注册到全局定时器 → 返回 Pending,线程继续服务其他任务
  2. 等待期间:零轮询、零浪费——不需要每秒检查 1000 次”时间到了吗”,定时器在精确时间点触发唤醒
  3. 定时器触发Waker::wake() 通知调度器把任务放回就绪队列,下次 poll 时返回 Ready

这和张嘴就来的 thread::sleep 有着本质区别:

方面 thread::sleep tokio::time::sleep
线程状态 OS 挂起线程 线程继续执行其他任务
CPU 利用 0%(但线程资源被占) 100%(有效利用)
可并发量 ≈ 线程数(几千上限) 百万级任务
响应方式 时间到自动唤醒 通过 Waker 通知调度器

有了这个心理模型,接下来就可以拆开源代码,看 Tokio 怎么把伪代码里的 TOKIO_TIMER 变成真正的数据结构。

一、Sleep 的结构:一个 Future 的解剖

tokio::time::sleep(duration) 的签名返回一个 Sleep 结构体:

pub fn sleep(duration: Duration) -> Sleep {
    // 计算 deadline = now + duration
    // 然后调用 Sleep::new_timeout(deadline, location)
}

它实现了 Future trait,所以可以 .await。但 Sleep 里面到底放了什么?核心字段如下2

// tokio/src/time/sleep.rs: 220-232
pin_project! {
    pub struct Sleep {
        // 内部字段(tracing 相关,非关键路径)
        inner: Inner,

        // 连接 Sleep 实例和驱动它的 Timer 的关键纽带
        #[pin]
        entry: Timer,
    }
}

entry 字段的类型是 Timer,它在 tokio::runtime 内部被定义为一个 enum3

// tokio/src/runtime/mod.rs: 437-442
pub(crate) enum Timer {
    Traditional(time::TimerEntry),
    Alternative(time_alt::Timer),  // 仅 tokio_unstable + rt-multi-thread
}

本文聚焦在 Traditional 分支——也就是默认的 Timer 实现。TimerEntry 的结构如下4

// tokio/src/runtime/time/entry.rs: 287-310
pub(crate) struct TimerEntry {
    // Arc 引用到 runtime 的调度器句柄
    driver: scheduler::Handle,

    // 共享的内部结构,参与侵入式链表
    #[pin]
    inner: Option<TimerShared>,

    // deadline:首次 poll 时才注册到时间轮
    deadline: Instant,

    // 是否已经注册
    registered: bool,
}

TimerShared 是前端(TimerEntry)和驱动端(Driver)之间共享的状态5

// tokio/src/runtime/time/entry.rs: 336-360
pub(crate) struct TimerShared {
    // 侵入式双向链表的指针
    pointers: linked_list::Pointers<TimerShared>,

    // 注册到 Wheel 时的时间(原子变量)
    registered_when: AtomicU64,

    // 当前状态:到期时间、已触发、或已注销
    state: StateCell,

    _p: PhantomPinned,
}

这几个结构的层次关系可以总结为下图:

graph TD
    A["tokio::time::sleep(duration)"] --> B["Sleep<br/>(Future)"]
    B --> C["Timer<br/>(enum)"]
    C --> D["TimerEntry<br/>(Traditional 分支)"]
    D --> E["TimerShared<br/>(共享状态)"]
    E --> F["StateCell<br/>(原子状态 + Waker)"]
    E --> G["registered_when<br/>(AtomicU64)"]
    E --> H["pointers<br/>(侵入式链表节点)"]

简单来说:用户持有的 Sleep 是一个 Future,它的内部通过 TimerEntry 管理一个 deadline,而 TimerShared 以侵入式链表节点的形式嵌入到底层的哈希时间轮(Hashed Timing Wheel)中。

二、首次 poll:注册到时间轮

Sleep 第一次被 poll 时,TimerEntry 还没有注册到时间轮。这时它会调用 reset,将自己插入到底层的 Wheel 数据结构中6

// tokio/src/runtime/time/entry.rs: 598-617
pub(crate) fn poll_elapsed(
    mut self: Pin<&mut Self>,
    cx: &mut Context<'_>,
) -> Poll<Result<(), super::Error>> {
    if !self.registered {
        let deadline = self.deadline;
        // 首次 poll:将 deadline 注册到时间轮
        self.as_mut().reset(deadline, true);
    }

    let inner = self.inner()
        .expect("inner should already be initialized by `self.reset()`");
    // 通过 StateCell 检查是否已经到期
    inner.state.poll(cx.waker())
}

reset 的核心逻辑是:

  1. Instant 格式的 deadline 通过 TimeSource 转换成 u64 类型的 tick(毫秒级)
  2. 先尝试原子操作 extend_expiration——如果新 deadline 比旧的晚,可以无锁更新
  3. 如果 extend_expiration 失败(比如 deadline 提前了,或者 timer 已经在触发队列中),则走 reregister 路径重新插入 Wheel

注意这里的类型转换和调用链。reset() 的源码中7

// tokio/src/runtime/time/entry.rs: 638-649
let inner = match self.inner() {
    Some(inner) => inner,          // inner: &TimerShared
    None => { /* 初始化 */ }
};

// extend_expiration 失败时走 reregister 路径
if reregister {
    unsafe {
        self.driver()
            .reregister(&self.driver.driver().io, tick, inner.into());
    }
}

inner&TimerShared.into() 通过标准库的 From<&T> for NonNull<T> 将其转换为 NonNull<TimerShared>——一个原始指针包装。这个 NonNull 随后被传入 Handle::reregister()8

// tokio/src/runtime/time/mod.rs: 398-435
pub(self) unsafe fn reregister(
    &self, unpark: &IoHandle, new_tick: u64,
    entry: NonNull<TimerShared>,    // ← 来自 inner.into()
) {
    let mut lock = self.inner.lock();

    // 如果 timer 还在 Wheel 中,先移除
    if unsafe { entry.as_ref().might_be_registered() } {
        lock.wheel.remove(entry);
    }

    // 创建 TimerHandle——一个 NonNull<TimerShared> 的 "唯一指针"
    let entry = entry.as_ref().handle();

    entry.set_expiration(new_tick);  // 设置新的到期时间

    // 插入 Wheel
    match unsafe { lock.wheel.insert(entry) } {
        Ok(when) => {
            // 如果新到期时间比当前最早更早 → unpark 线程
            if lock.next_wake.map(|n| when < n.get()).unwrap_or(true) {
                unpark.unpark();
            }
        }
        Err((entry, InsertError::Elapsed)) => unsafe { entry.fire(Ok(())) },
    }
}

这里 entry.as_ref().handle() 创建 TimerHandle——它的定义极其简单9

// tokio/src/runtime/time/entry.rs: 183-188
pub(crate) struct TimerHandle {
    inner: NonNull<TimerShared>,
}

// tokio/src/runtime/time/entry.rs: 308-312
impl TimerShared {
    pub(super) fn handle(&self) -> TimerHandle {
        TimerHandle { inner: NonNull::from(self) }
    }
}

TimerHandle 不拥有 TimerShared——它只是一个”唯一指针”,借用规则由 unsafe 契约保证(持有驱动锁时才能操作)。

随后 Wheel::insert() 根据到期时间计算层级和 slot 位置,调用 Level::add_entry()TimerHandle(即 NonNull<TimerShared>)push 到 slot 的侵入式链表上10

// tokio/src/runtime/time/wheel/mod.rs: 88-112
pub(crate) unsafe fn insert(
    &mut self, item: TimerHandle,
) -> Result<u64, (TimerHandle, InsertError)> {
    let when = unsafe { item.sync_when() };  // 同步到期时间
    if when <= self.elapsed {
        return Err((item, InsertError::Elapsed));  // 已过期
    }

    let level = self.level_for(when);  // 计算层级
    unsafe { self.levels[level].add_entry(item); }
    Ok(when)
}

// tokio/src/runtime/time/wheel/level.rs: 122-128
pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) {
    let slot = slot_for(unsafe { item.registered_when() }, self.level);
    self.slot[slot].push_front(item);  // push 到侵入式链表
    self.occupied |= occupied_bit(slot);  // 标记 slot 非空
}

self.slot[slot] 的类型是 EntryList,即 LinkedList<TimerShared, TimerShared>——一个侵入式双向链表。push_frontTimerHandle 中的 NonNull<TimerShared> 作为链表节点插入,TimerSharedpointers 字段就是它的 prev/next 指针。

这里有一个关键设计——两层到期时间缓存TimerShared 中有两个时间字段:

// tokio/src/runtime/time/entry.rs: 269-281
pub(crate) struct TimerShared {
    registered_when: AtomicU64,  // ① 写入 slot 时的到期时间缓存
    state: StateCell,            // ② StateCell 内部的 AtomicU64 存 true expiration
    // ...
}

sync_when() 是两层之间的桥梁——把 StateCell.state(true_when)同步到 registered_when,供 Wheel 做 slot 定位:

// Wheel::insert() 先调用 sync_when()
let when = unsafe { item.sync_when() };  // StateCell.state → registered_when
if when <= self.elapsed { return Err(Elapsed); }
let level = self.level_for(when);           // 用 sync 后的值算层级
self.levels[level].add_entry(item);         // add_entry 内部用 registered_when 算 slot

收割时,process_expiration()mark_pending(expiration.deadline) 直接读 StateCell.state 做最终比对:

// process_expiration 简化
match unsafe { item.mark_pending(expiration.deadline) } {
    Ok(()) => pending.push_front(item),  // StateCell.state ≤ deadline → 到期
    Err(expiration_tick) => {            // StateCell.state > deadline → 还没到
        // 把 true_when 写回 registered_when,重插入正确 slot
        self.levels[level].add_entry(item);
    }
}

这种两层设计是为 extend_expiration 乐观路径服务的:用户通过 CAS 无锁延后了 StateCell.state,但 registered_when 还指向旧的 slot。Driver 收割时发现 state > slot deadline,不触发 fire,而是把 state 同步回 registered_when 并重插入正确 slot——整个过程零锁争用。

所以整条注册链是:

Sleep::poll_elapsed()
    → TimerEntry::reset()
        → extend_expiration(tick)  ← 乐观路径:CAS 无锁延后
        → Handle::reregister()      ← 失败时走此路径
            → lock.wheel.insert(handle)  ← TimerHandle 入 Wheel
                → Level::add_entry()
                    → slot[slot_index].push_front(item)
                    → occupied |= (1 << slot)

现在可以更精确地描述数据流:Sleep 持有 TimerEntryTimerEntry 持有 Pin<Option<TimerShared>>TimerShared 通过 NonNull<TimerShared>(即 TimerHandle)以侵入式链表节点形式直接挂在 Level.slot[slot]。没有复制、没有中间分配——移动 TimerHandle 就是在操作 TimerSharedpointers 指针。

这个过程可以用下面的序列图看清:

sequenceDiagram
    participant Sleep as Sleep Future
    participant Entry as TimerEntry
    participant Shared as TimerShared
    participant Handle as TimerHandle
    participant Wheel as Wheel
    participant Level as Level.slot[64]

    Sleep->>Entry: poll_elapsed(cx)
    Entry->>Entry: reset(deadline)
    Entry->>Shared: extend_expiration(tick)
    Note over Shared: 如果新 deadline<br/>比旧的晚→CAS 成功<br/>跳过重新注册
    alt extend_expiration 失败
        Entry->>Shared: inner.into() → TimerHandle
        Entry->>Wheel: Handle.reregister(tick)
        Wheel->>Wheel: remove(old slot)
        Wheel->>Wheel: insert(handle)
        Wheel->>Level: level_for(when) 计算层级
        Level->>Level: slot[slot_index].push_front(item)
        level-->>Wheel: OK(when)
        Wheel-->>Entry: 注册完成
    end
    Entry->>Shared: state.poll(waker)
    Shared-->>Entry: Poll::Pending
    Entry-->>Sleep: Poll::Pending

这里有一个关键细节:首次注册不是在 sleep() 调用时发生的,而是在第一次 poll 时发生的。这也是为什么如果在 runtime 外部直接调用 sleep() 会 panic——它需要拿到当前的调度器句柄,而句柄只存在于 runtime 上下文中。如果把它包在一个 async block 里,调用就是惰性的,等到真正 poll 时已经在 runtime 内部了,就不会 panic11

三、哈希时间轮(Hashed Timing Wheel)

Timer 的核心数据结构是一个多层哈希时间轮。Tokio 的注释明确引用了 Varghese 和 Lauck 的经典论文12,该论文也因 Linux 内核的高精度定时器实现而广为人知。

时间轮的核心思想很简单:把到期的定时器散列到不同的 slot 里,时钟走动时只检查当前 slot,避免每次都扫描全部定时器

它本质上是用空间换时间:用 6 层 × 64 slot 的数组,换取每次 tick 只扫一个 slot 的能力。对比一下就能明白——如果用一个普通 Vec 或链表存储所有定时器,每次 tick 都得全量扫描一遍,O(n);而时间轮把定时器按到期时间散列,每次 tick 只检查当前 slot,O(1)。存储只是手段,快速检索到期定时器才是时间轮存在的目的。

Tokio 使用了 6 层时间轮,每层 64 个 slot:

Level 0: 64 × 1 ms      = 64 ms 覆盖范围
Level 1: 64 × 64 ms     = ~4 秒覆盖范围
Level 2: 64 × ~4 s      = ~4 分钟覆盖范围
Level 3: 64 × ~4 min    = ~4 小时覆盖范围
Level 4: 64 × ~4 hr     = ~12 天覆盖范围
Level 5: 64 × ~12 day   = ~2 年覆盖范围

下图展示了一个简化的 3 层时间轮的工作方式:

graph LR
    subgraph "Level 2 (~4 min)"
        L2["slot 0-63<br/>每 slot ~4s"]
    end
    subgraph "Level 1 (~4 sec)"
        L1["slot 0-63<br/>每 slot 64ms"]
    end
    subgraph "Level 0 (64 ms)"
        L0["slot 0-63<br/>每 slot 1ms"]
    end

    L2 -->|"降级:slot 到期<br/>重新分配"| L1
    L1 -->|"降级"| L0
    L0 -->|"到期触发"| F["fire → wake"]

定时器首先被插入到与它的到期时间匹配的层级。比如一个 10 秒后到期的定时器,会落在 Level 2 的某个 slot 里。当该 slot 到期时,里面的所有定时器会被重新分配到 Level 1;然后再降到 Level 0;最终在 Level 0 的 slot 到期时真正触发。

这种层级设计的优势在于:

Wheel 结构体的关键字段13

// tokio/src/runtime/time/wheel/mod.rs: 22-39
pub(crate) struct Wheel {
    // 时间轮启动以来经过的毫秒数
    elapsed: u64,

    // 6 层,每层 64 个 slot
    levels: Box<[Level; NUM_LEVELS]>,

    // 等待触发的定时器队列
    pending: EntryList,
}

每个 Level 包含一个 [EntryList; 64] 数组,而 EntryList 就是 LinkedList<TimerShared>14

// tokio/src/runtime/time/wheel/level.rs: 6-15
pub(crate) struct Level {
    level: usize,
    occupied: u64,              // 位图标记哪些 slot 非空
    slot: [EntryList; 64],      // 64 个 slot,每个存放一个侵入式链表
}

// tokio/src/runtime/time/entry.rs: 195
pub(super) type EntryList = LinkedList<TimerShared, TimerShared>;

occupied 是一个 u64 位图——第 i 位为 1 表示 slot[i] 非空。next_expiration()trailing_zeros() 直接在一条指令中找到最低位的 1,避免遍历 64 个 slot:

// tokio/src/runtime/time/wheel/level.rs: 60-78
pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
    // occupied >> now_slot: 跳过当前 slot 之前的位
    let mask = self.occupied.rotate_right(now_slot as u32) >> 1;
    let next = mask.trailing_zeros();      // ← 一条指令找下一个非空 slot
    //                        ↑ 如果 mask = 0(无更多非空 slot),返回 64
    //                          调用者据此判断本层无到期
    if next >= 64 { return None; }

    let slot = (now_slot + next as usize + 1) % 64;
    let level_range = 64_usize.pow(self.level as u32) * 64;
    let slot_range = 64_usize.pow(self.level as u32);
    let level_start = (now / level_range as u64) * level_range as u64;
    let deadline = level_start + (slot as u64) * slot_range as u64;
    Some(Expiration { level: self.level, slot, deadline })
}

所以 Wheel::poll()loop 内部相当于:

for each level [0..5]:
    if level.occupied != 0:                    // 位图非空
        mask = (occupied >> now_slot) >> 1
        next = mask.trailing_zeros()           // 硬件指令,~1 个周期
        if next < 64: return (level, slot, deadline)
    // 否则本层无到期 → 继续查下一层
return None  // 所有层级都空

最坏情况(没有任何 timer):6 次 occupied != 0 检查 + 6 次 trailing_zeros,总开销约几十个 CPU 周期。无空轮询、无循环遍历 64 slot——这是位图优化在时间轮中的经典用法。

所以 TimerShared 和 Wheel 的关系是:

graph TD
    subgraph Wheel
        W["Wheel"]
        subgraph Levels
            L0["Level 0: [slot0, slot1, ..., slot63]"]
            L1["Level 1: [slot0, slot1, ..., slot63]"]
            L5["... Level 5"]
        end
        Pend["pending: EntryList"]
    end

    subgraph Timer
        TS1["TimerShared A<br/>(state + pointers)"]
        TS2["TimerShared B<br/>(state + pointers)"]
    end

    L0 -->|"slot[5]"| TS1
    L0 -->|"slot[5]"| TS2

    TS1 -->|"pointers.prev"| TS2
    TS2 -->|"pointers.next"| TS1

    subgraph Sleep
        S["Sleep 1"] --> TE1["TimerEntry 1"]
        TE1 --> TS1
        S2["Sleep 2"] --> TE2["TimerEntry 2"]
        TE2 --> TS2
    end

这里的关键是 TimerShared 通过侵入式链表的 pointers 字段直接挂在 Level.slot[slot_index],Wheel 不复制也不拥有 TimerShared 的所有权——它只通过指针链接。这就是”侵入式”的含义:被链接的对象自身包含链接指针,而非由容器分配独立的节点。这也意味着 TimerShared 必须被 pin 住不能移动,因为链表指针指向的是它自身的内存地址。

3.2 一个完整的例子:5 分钟 sleep 的降级之旅

关于时间参考点的说明Wheel.elapsedStateCell.stateregistered_when 都是相对于同一个 TimeSource.start_time绝对毫秒偏移量。下面的例子设 start_time = sleep 创建的时刻,此时 elapsed 在第一次 process_at_time 前尚未推进,设为例值 0。实际运行中 elapsed 一般是个小的非零值(runtime 启动后到首次 park 之间经过的毫秒数),但这一点偏移不影响层级计算逻辑,因为 state - elapsed 的差值不受影响。

把前面的概念串起来,假设你在 elapsed=0(即 runtime 刚启动时)创建了一个 tokio::time::sleep(Duration::from_secs(300)).await,看看它在 Wheel 中如何从高层降级到低层,最终被 fire。

t=0:创建和注册

StateCell.state = STATE_DEREGISTERED  // 初始值 u64::MAX
Wheel.elapsed = 0                     // Wheel 刚开始

首次 poll 时TimerEntry::reset(deadline) 被调用。TimeSource::deadline_to_tick() 将 5 分钟(300 秒)转为毫秒 tick:

tick = 300,000   // 5 min = 300,000 ms
extend_expiration(tick=300000)
  → 失败(初始状态是 STATE_DEREGISTERED,不是有效的到期时间)
  → 走 reregister 路径
     → set_expiration(300000)
        StateCell.state = 300,000    // ← true expiration
     → Wheel::insert()
        → sync_when()
            registered_when = 300,000
            StateCell.state = 300,000 (不变)
        → level_for(0, 300000) = 3
          // 计算过程:masked = 0 ^ 300000 | 63 = 300063
          // leading_zeros = 45, significant = 18, 18/6 = 3
        → Level 3, slot 1
          // slot_for(300000, 3) = (300000 >> 18) % 64 = 1

此时 Wheel 状态:

Wheel.elapsed = 0
  Level 3: slot[1] → TimerShared (registered_when=300000, state=300000)

t=0 ~ 4分钟:线程 park,等最早 deadline

park_internal()next_expiration_time(),发现 Level 3 slot 1 的 deadline 在约 4.37 分钟后(1 × 64^3 = 262,144ms),于是 park_timeout(262s)

t=262s:Level 3 slot 1 到期,降级到 Level 2

Driver 醒来,process_at_time(262144)Wheel::poll(262144)Wheel::poll() 的内部循环13先检查 next_expiration() 找到 Level 3 slot 1,发现其 deadline=262144 ≤ now,于是调用 self.process_expiration(expiration)

// tokio/src/runtime/time/wheel/mod.rs: 126-133
pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> {
    loop {
        if let Some(handle) = self.pending.pop_back() { return Some(handle); }
        match self.next_expiration() {
            Some(ref expiration) if expiration.deadline <= now => {
                self.process_expiration(expiration);  // ← 收割降级
                self.set_elapsed(expiration.deadline);
            }
            _ => { self.set_elapsed(now); break; }
        }
    }
    self.pending.pop_back()
}

process_expiration 先取走 slot 中所有 entry(take_entries),然后逐个调用 mark_pending(deadline)13。对每个 entry,mark_pendingStateCell.statedeadline 做对比:

// tokio/src/runtime/time/wheel/mod.rs: 171-196
pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
    let mut entries = self.take_entries(expiration);  // 取出 slot 链表
    while let Some(item) = entries.pop_back() {
        match unsafe { item.mark_pending(expiration.deadline) } {
            Ok(()) => self.pending.push_front(item),  // 到期
            Err(expiration_tick) => {        // 还没到
                let level = level_for(expiration.deadline, expiration_tick);
                unsafe { self.levels[level].add_entry(item); }  // 降级
            }
        }
    }
}

itemTimerHandle(即 NonNull<TimerShared>)。mark_pending(262144) 内部:

// tokio/src/runtime/time/entry.rs: 171-199 (StateCell::mark_pending)
let mut cur_state = self.state.load(Ordering::Relaxed);
// cur_state = 300,000(5分钟到期)
// not_after = 262,144(Level 3 slot 1 deadline)
if cur_state > not_after {  // 300,000 > 262,144 → true
    break Err(cur_state);   // 还没到点,返回 true expiration
}

返回 Err(300000) 后,TimerHandle::mark_pendingtrue_when 写回 registered_when

// tokio/src/runtime/time/entry.rs: 250-264 (TimerHandle::mark_pending)
Err(tick) => {
    unsafe { self.inner.as_ref().set_registered_when(tick); }
    Err(tick)
}

回到 process_expiration,用 level_for(262144, 300000) 重新计算层级:

masked = 262144 ^ 300000 | 63 = 37887
leading_zeros = 48, significant = 15, 15/6 = 2
→ Level 2, slot 9
  slot_for(300000, 2) = (300000 >> 12) % 64 = 9
→ add_entry: Level 2, slot[9].push_front(item)
→ timer 已从 Level 3 slot 1 移至 Level 2 slot 9
Wheel.elapsed = 262,144
  Level 2: slot[9] → TimerShared (registered_when=300000, state=300000)
  Level 3: slot[1] → (空)

t=262s ~ 298s:继续 park

next_expiration_time() 找到 Level 2 slot 9 的 deadline:

slot_range(2) = 64^2 = 4,096 ms
slot 9 × 4,096 ms = 36,864 ms
Level 2 slot 9 deadline ≈ 262,144 + 36,864 = 298,...

线程 park_timeout(~36s)

t=298s:Level 2 slot 9 到期,降级到 Level 1

process_expiration(Level 2, slot 9, deadline=298xxx):
  → mark_pending(298xxx)
     StateCell.state = 300,000 > 298,xxx → 还没到点
     → Err(300000)
  → level_for(298xxx, 300000) = 1
     → Level 1, slot 对应 300,000

Wheel.elapsed = 298,...
  Level 1: slot[...] → TimerShared
  Level 2: slot[9] → (空)

t=298s ~ 300s:Level 1 到 Level 0,最终 fire

类似逻辑继续降级到 Level 0。Level 0 slot 宽度是 1ms:

elapsed >= 300,000 时,Level 0 到期。process_expiration 再次被调用,但这次是 Level 0,mark_pending 中的判断不同了:

// tokio/src/runtime/time/entry.rs: 171-199 (StateCell::mark_pending)
let mut cur_state = self.state.load(Ordering::Relaxed);
// cur_state = 300,000
// not_after = 300,000(Level 0 slot deadline)
if cur_state > not_after {  // 300,000 > 300,000 → false
    break Err(cur_state);
}
// cur_state ≤ not_after → 到期!
compare_exchange(cur_state, STATE_PENDING_FIRE, AcqRel, Acquire)

CAS 成功将状态从 300000 置为 STATE_PENDING_FIRE,返回 Ok(()),entry 被推入 self.pending。随后 Wheel::poll()loop 下一次迭代从 pending 弹出这个 entry,process_at_time 拿到后调用 fire()

// tokio/src/runtime/time/entry.rs: 211-231 (StateCell::fire)
let cur_state = self.state.load(Ordering::Relaxed);
if cur_state == STATE_DEREGISTERED { return None; } // 不为 u64::MAX

// 写入结果
unsafe { self.result.with_mut(|p| *p = Ok(())) };
// state → STATE_DEREGISTERED(u64::MAX)
self.state.store(STATE_DEREGISTERED, Ordering::Release);

// 取出 waker 返回
self.waker.take_waker()

fire 返回 Some(waker)process_at_time 将其压入 WakeList,释放锁后批量 wake_all()——任务被放回调度队列,下次 Sleep::poll() 返回 Poll::Ready(())

如果用 reset() 延长 deadline 呢?

假设在第 1 分钟时,用户调用了 sleep.reset(new_deadline=10min)

TimerEntry::reset(10min)
  → tick = 600,000
  → extend_expiration(600000)
     StateCell.state 从 300,000 CAS 到 600,000 ✓
     → 成功,return(跳过 reregister!)

此时 timer 还在 Level 3 slot 1 里,registered_when 仍然是 300,000。没有任何链表操作——这就是”乐观延后”的零锁优势。

当 Driver 在 elapsed=262144 时收割 Level 3 slot 1:

mark_pending(262144):
  StateCell.state = 600,000 > 262,144
  → Err(600000)
  → 把 true_when(600000) 同步回 registered_when
  → 按 600,000 重新计算层级
     level_for(262144, 600000) = 更高的层级...
  → 插入到正确的 slot

整个过程零锁争用——extend_expiration 是 CAS,mark_pending 也是 CAS,没有 Mutex。

3.3 elapsed 是如何维护的?

一个关键的设计细节是:elapsed 不依赖任何外部定时器来推进。它是 Driver 在每次 process_at_time(now) 时,用 nowelapsed 的差值来”推”的。

每次 Driver 醒来(无论是被 epoll_wait 超时叫醒,还是被 I/O 事件提前 unpark),都会调用 handle.process(clock),内部是 process_at_time(now)。Wheel 收到 nowu64 tick,来自 TimeSource),和当前的 elapsed 比较差值,把落在差值内的 slot 逐个处理——收割到期 timer、降级上层 timer。

Driver 被 I/O 事件提前叫醒(本应在 5 秒后才醒):
  → process_at_time(now=1003)    // 只走了 3ms
  → Wheel: 从 elapsed=1000 到 now=1003
           第 1000 slot 收割 → 无到期 timer
           第 1001-1002 slot 无内容
           elapsed = 1003
  → 所有 timer 都还在,下次 park_timeout 重新计算

Driver 在 deadline 时被叫醒:
  → process_at_time(now=6000)
  → Wheel: 从 elapsed=1000 到 now=6000
           第 1000、1001、…、5000 slot 逐个处理
           → elapsed=5000 时有到期 timer → fire → wake
           → 继续推进到 6000
           elapsed = 6000
  → 重新计算 next_wake

所以时间轮是完全自洽的:它不和”墙上时间”绑定,只和 elapsed 这个单调递增的计数器绑定。TimeSourceclock.now() 只用来算增量——增量多大,elapsed 就推多远。即使系统被暂停(suspend-to-RAM),恢复后 clock.now() 返回更新后的时间,增量包含暂停时长,elapsed 一次推到位,所有”超期的 timer”被立即收割。

tokio 不依赖任何外部定时器来推进时间轮。外部定时器(epoll_wait 的 timeout)的唯一作用是”到点叫醒线程”——醒来后 tokio 用自己的 TimeSource 计算增量、推进 elapsed、收割到期 timer。这就是为什么 park_timeout(0) 也能工作:即使不进入内核定时器,process_at_time 也能用 now = TimeSource::now() 推进 Wheel。

四、StateCell:原子状态机

TimerShared 中最关键的部分是 StateCell——一个用原子变量实现的状态机,负责协调用户端(TimerEntry)和驱动端(Driver)之间的并发访问15

// tokio/src/runtime/time/entry.rs: 91-102
pub(super) struct StateCell {
    // 保存到期时间,或特殊标志值 u64::MAX(已注销)
    state: AtomicU64,

    // 定时器触发后的结果
    result: UnsafeCell<TimerResult>,

    // 已注册的 Waker
    waker: AtomicWaker,
}

state 字段承载了定时器的完整生命周期:

stateDiagram-v2
    [*] --> Scheduled: set_expiration(tick)
    Scheduled --> PendingFire: mark_pending() 成功
    PendingFire --> Fired: fire(Ok(()))
    Scheduled --> Fired: 直接 fire
    Fired --> [*]: STATE_DEREGISTERED
    Scheduled --> Cancelled: cancel / deregister
    Cancelled --> [*]

几个关键操作的实现:

poll——用户端检查是否到期15

// tokio/src/runtime/time/entry.rs: 142-162
fn poll(&self, waker: &Waker) -> Poll<TimerResult> {
    // 先注册 waker,确保 fire 和 poll 之间的竞争不会丢失唤醒
    self.waker.register_by_ref(waker);
    // 读取状态
    self.read_state()
}

fn read_state(&self) -> Poll<TimerResult> {
    let cur_state = self.state.load(Ordering::Acquire);
    if cur_state == STATE_DEREGISTERED {
        // 已经触发,读取 result
        Poll::Ready(unsafe { self.result.with(|p| *p) })
    } else {
        Poll::Pending
    }
}

注意 poll 里先注册 waker 再读状态的顺序——如果反过来,可能会发生在读状态之后、注册 waker 之前这个窗口期内定时器恰好触发并调用 fire,从而导致任务永久丢失唤醒信号。

mark_pending——驱动端将定时器移入待触发队列15

// tokio/src/runtime/time/entry.rs: 171-199
unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
    let mut cur_state = self.state.load(Ordering::Relaxed);
    loop {
        if cur_state > not_after {
            // 实际到期时间比当前 tick 晚——不应该触发
            break Err(cur_state);
        }
        match self.state.compare_exchange_weak(
            cur_state,
            STATE_PENDING_FIRE,
            Ordering::AcqRel,
            Ordering::Acquire,
        ) {
            Ok(_) => break Ok(()),
            Err(actual_state) => cur_state = actual_state,
        }
    }
}

这里使用 CAS(Compare-And-Swap)来原子地将定时器从”已调度”状态转换到”待触发”状态。CAS 失败意味着有人并发修改了状态(比如用户端重置了定时器),需要重试。

fire——完成定时器15

// tokio/src/runtime/time/entry.rs: 211-231
unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
    let cur_state = self.state.load(Ordering::Relaxed);
    if cur_state == STATE_DEREGISTERED {
        return None;  // 已经触发过了
    }

    // 写入结果
    unsafe { self.result.with_mut(|p| *p = result) };
    // 标记为已注销
    self.state.store(STATE_DEREGISTERED, Ordering::Release);

    // 取出之前注册的 waker 以便唤醒
    self.waker.take_waker()
}

fire 返回一个 Option<Waker>,驱动端在释放锁之后调用它来唤醒等待中的任务。

4.1 为什么每个 TimerShared 独立拥有 StateCell?

理解 StateCell 之后,一个自然的问题是:为什么每个 TimerShared(即每个 timer)都独享自己的 StateCell?能不能多个 timer 共享一个?

答案是否定的。因为”一个 timer 的 poll 和另一个 timer 的 fire 之间的并发”和”同一个 timer 的 pollfire 之间的并发”性质完全不同:

如果所有 timer 共享一个 StateCell,那就是一把全局锁的缩影——task A 的 poll() 和 task B 的 poll() 也要争同一个原子变量,毫无隔离性可言。

所以模式是:每个 timer 的并发边界是单 timer 粒度的,独立 StateCell 是最自然的选择。

如果没有 StateCell 会怎样?

三个具体问题,按严重程度排列:

1. 丢失唤醒(lost wakeup)——最致命

用传统 Mutex<bool> 替代 StateCell 的话,存在一个经典的 TOCTOU(Time-Of-Check to Time-Of-Use)窗口:

时间线:
  Driver 线程:   检查到期 → 设 flag=true → waker.wake()
                    ↑
  Task 线程:     poll() → 检查 flag=false → 注册 waker → 返回 Pending

Task 线程读 flag(false)和注册 waker 之间插入了 Driver 线程:flag 被设为 true,wake 被调用——但 Task 已经检查了 flag 为 false,即将注册 waker 后返回 Pending。这个 wake 永久丢失,任务永远醒不过来。

StateCell 用 AtomicU64 + 先注册 waker 再读状态的顺序消除这个窗口:即使在写入 waker 后、读状态前 Driver 调用了 fireread_state() 读到 STATE_DEREGISTERED 后直接返回 Ready,不会丢失。

2. 重复触发(double fire)

没有原子状态区分”正在 fire”和”已被取消”——Driver 可能在一个被 cancel() 的 timer 上再次调用 waker.wake(),造成任务被错误唤醒两次。

StateCell 的 fire() 中先检查 cur_state == STATE_DEREGISTERED,是则返回 None;CAS 确保一次到期只转换为一次 wake()

3. 锁竞争放大

没有原子状态,每层操作都持锁:

StateCell 把高频路径——poll() 检查是否到期——变成一次 AtomicU64::load:免锁、无竞争、单条指令。只在真正的竞争边界(CAS 更新状态)需要一条原子指令,不持有任何锁。

每个 timer 一个 AtomicU64,遍历 N 个 timer 要 N 次 CAS,影响精度吗?

一个自然的担心:Driver 在 process_at_time 里遍历 slot 中的所有到期 timer,逐个调用 mark_pending + fire(每个都是 CAS 操作),如果 slot 里有几千个 timer,遍历开销会不会让它们”不准时”?

实际约束让这个开销无关紧要:

约束一:每个 CAS 的成本极低。一次 AtomicU64::compare_exchange 在 x86 上是一条 lock cmpxchgq 指令,无争用时约 10-20ns。1,000 个 timer 同时到期 ≈ 10-20μs 的遍历时间。

约束二:slot 粒度本身就是保护层。时间轮 level 0 的 slot 宽度是 1ms。同一 slot 内的所有 timer 本来就在同一个 1ms 窗口内到期。遍历开销占 slot 宽度的 1-2%

t=1000ms   level-0 slot 边界
    ├─ timer A (1000.1ms 到期)
    ├─ timer B (1000.5ms 到期)
    ├─ timer C (1000.7ms 到期)
    └─ ... 最多几千个 timer
t=1001ms   level-0 slot 边界
           ↑ 遍历 1,000 个 timer 耗时 ~15μs
           ↑ 不影响下一个 slot(1001ms 才需要检查)

约束三:锁零争用。CAS 只在 Driver 和 Task 同时操作同一个 timer 时才失败。大多数 timer 到期时对应的任务睡眠在 Poll::Pending——CAS 一次成功,无重试。对比 Mutex 方案:收割时 lock() + unlock() 一千次,且 Task 端的 poll() 也要抢同一把锁,争用放大。

真实的权衡是16

每个 timer 一个 AtomicU64:
  × 遍历 N 个 timer 需要 N 次 CAS
  ✓ 零锁争用、零 syscall、O(N) 的 ~10ns/timer 线性代价

替代方案——每个 slot 一把锁:
  × Task poll() 必须等 Driver 放锁(可能活锁)
  × 锁竞争随 timer 数量超线性放大
  ✓ 批量移动 slot 时只需一次锁操作

Tokio 选择了 CAS 方案。因为 timer 数量越多,CAS 的优势越明显——锁竞争是超线性放大,CAS 遍历是严格 O(N) 且常数极小。

五、运行时如何驱动定时器

前面讲的是 Sleep 的结构和状态管理,但这些定时器到底是怎么被驱动的?答案在运行时的主循环里。

Tokio 的运行时中有两个 Driver,各有自己的主循环函数:

这两个 Driver 是分别独立初始化的——在 runtime::driver::Driver::new() 中,先创建 I/O Driver,再在其上包裹 Time Driver,两者各自持有一份内部状态17

// tokio/src/runtime/driver.rs: 108-117
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
    let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
    //                ↑ I/O Driver 从这里创建,持有 mio::Poll
    let (time_driver, time_handle) =
        create_time_driver(cfg.enable_time, cfg.timer_flavor, io_stack, &clock);
    //  ↑ Time Driver 从这里创建,持有 Wheel
    //  ↑ io_stack 作为 IoStack 传给 Time Driver——这就是"堆叠"
}

创建出的 Handle 包含了两个子句柄:

// tokio/src/runtime/driver.rs: 52-59
pub(crate) struct Handle {
    pub(crate) io: IoHandle,       // I/O Driver 的句柄
    pub(crate) signal: SignalHandle,
    pub(crate) time: TimeHandle,   // Time Driver 的句柄
    pub(crate) clock: Clock,
}

这个 HandleArc 包装后分发给所有 worker 线程——所有线程共享同一个 Driver 实例,没有 per-thread 的 Driver。多线程访问通过内部锁串行化:Time Driver 的 Wheel 受 Mutex<InnerState> 保护(一次只有一个线程收割),I/O Driver 的 epoll_wait&mut self 上自然串行。

两者的关系是堆叠调用——Timer 的 park_internal() 内部调用 I/O Driver 的 turn(),后者执行实际的 epoll_wait 阻塞:

Timer::park_internal()
    ├── 查时间轮 → 计算 timeout(最早 deadline - now)
    ├── IoStack::park_timeout(timeout)
    │   └── I/O Driver::turn()
    │       └── epoll_wait(events, timeout)   ← 真正的阻塞点
    ├── process_at_time()                      ← 收割到期 timer
    └── (I/O 事件已在 turn() 里处理:ScheduledIo::wake)

每个 Driver 维护自己的数据结构,醒来后各管各的:

graph TD
    subgraph Runtime::park_internal
        A["开始"] --> B["Timer::park_internal()"]
        B --> C["1. 查 Wheel → next_wake"]
        C --> D["2. IoStack::park_timeout(duration)"]
        D --> E["I/O Driver::turn()"]
        E --> F["epoll_wait(events, timeout)"]
        F -->|"唤醒"| G["分发 I/O 事件<br/>→ ScheduledIo::wake()"]
        G --> H["Timer::process_at_time()"]
        H --> I["收割到期 timer<br/>→ StateCell::fire()"]
    end
    style B fill:#e1f5fe
    style E fill:#c8e6c9
    style H fill:#e1f5fe
    style G fill:#c8e6c9

Timer Driver 决定”等多久”,I/O Driver 负责”执行等待”,醒来后各处理各的数据——Timer 收割时间轮,I/O 分发事件到 ScheduledIo。两者在同一个线程上、同一次 epoll_wait 返回后依次完成。

下面来看 Timer Driver 的 park_internal 具体实现18

// tokio/src/runtime/time/mod.rs: 213-256
fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
    let handle = rt_handle.time();
    let mut lock = handle.inner.lock();

    // 获取时间轮中最近的到期时间
    let next_wake = lock.wheel.next_expiration_time();
    lock.next_wake = next_wake.map(|t|
        NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())
    );
    drop(lock);

    match next_wake {
        Some(when) => {
            let now = handle.time_source.now(rt_handle.clock());
            let mut duration = handle.time_source
                .tick_to_duration(when.saturating_sub(now));

            if duration > Duration::from_millis(0) {
                // 有定时器在将来才到期:带超时地 park 线程
                self.park_thread_timeout(rt_handle, duration);
            } else {
                // 有定时器已经到期或即将到期:立即返回
                self.park.park_timeout(rt_handle, Duration::from_secs(0));
            }
        }
        None => {
            // 没有定时器且有 limit:带超时 park
            // 没有定时器且没有 limit:无限期 park
        }
    }

    // 醒来后处理到期的定时器
    handle.process(rt_handle.clock());
}

流程很清晰:

sequenceDiagram
    participant RT as Runtime Worker
    participant Driver as Time Driver
    participant Wheel as Hashed Wheel
    participant OS as OS Timer

    RT->>Driver: park_internal()
    Driver->>Wheel: next_expiration_time()
    Wheel-->>Driver: Some(5000) = 5 秒后
    Driver->>OS: park_timeout(5s)
    Note over OS: 线程休眠,CPU 可服务其他任务
    OS-->>Driver: 5 秒后唤醒(或更早被 unpark)
    Driver->>Wheel: process(now)
    Wheel-->>Driver: 到期的 TimerEntry 列表
    Driver->>Driver: fire → 取出 Waker
    Driver->>RT: wake_all()
    RT->>RT: 重新 poll 对应的 Sleep Future

关键点在于:线程的休眠时间是由时间轮中最早的到期时间决定的。如果有多个 sleep 在等待,线程会以最早的 deadline 作为 park timeout;当时间到达(或被其他事件提前 unpark)后,驱动处理所有到期的定时器,批量唤醒对应的任务。

具体的处理逻辑在 Handle::process_at_time18

// tokio/src/runtime/time/mod.rs: 296-337
pub(self) fn process_at_time(&self, mut now: u64) {
    let mut waker_list = WakeList::new();
    let mut lock = self.inner.lock();

    // 从时间轮中收集所有到期(≤ now)的定时器
    while let Some(entry) = lock.wheel.poll(now) {
        if let Some(waker) = unsafe { entry.fire(Ok(())) } {
            waker_list.push(waker);
            if !waker_list.can_push() {
                // 批次满了,释放锁后批量唤醒
                drop(lock);
                waker_list.wake_all();
                lock = self.inner.lock();
            }
        }
    }

    // 更新下一次唤醒时间
    lock.next_wake = lock.wheel.poll_at()
        .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
    drop(lock);

    // 唤醒剩余 waker
    waker_list.wake_all();
}

这里使用 WakeList 来批量收集 waker,在释放驱动锁之后再统一调用 wake(),避免持有锁时调用外部代码带来死锁风险。

六、最底层:从 park_timeout 到时钟中断

前面说 park_timeout 让线程休眠到最近的 deadline,但它到底是怎么”准时醒来”的?答案在最底层的硬件时钟中断。

Tokio 的 park_timeout 在 Linux 上最终会调用 mio::Poll::poll(events, timeout),后者通过 epoll_wait(timeout_ms) 进入内核。内核内部使用高精度定时器(hrtimer)来实现这个超时:

sequenceDiagram
    participant Tokio as Tokio Worker
    participant mio as mio / epoll
    participant Kernel as Linux Kernel
    participant hrtimer as hrtimer 子系统
    participant Clock as 时钟事件设备 (APIC/HPET)

    Tokio->>mio: park_timeout(5000ms)
    mio->>Kernel: epoll_wait(timeout=5000ms)
    Kernel->>hrtimer: schedule_hrtimeout_range()
    hrtimer->>hrtimer: enqueue_hrtimer() 插入红黑树
    hrtimer->>Clock: 编程设备为 one-shot,5 秒后触发中断
    Note over Tokio: 线程休眠,CPU 可服务其他任务
    Note over Clock: ... 5 秒 ...
    Clock-->>Kernel: 硬件时钟中断
    Kernel->>hrtimer: hrtimer_interrupt()
    hrtimer->>hrtimer: __hrtimer_run_queues() 遍历红黑树
    hrtimer->>Kernel: hrtimer_wakeup() → wake_up_process()
    Kernel-->>Tokio: epoll_wait 返回
    Tokio->>Tokio: process_at_time() 处理到期定时器

6.1 C 语言的等价实现

Tokio 的 park_timeout 在 Linux 上依赖 epoll_wait 的超时参数。在 C 语言中,有两种不同的方式使用内核定时器,分别对应”主动检查”和”自动回调”两种模型。

timerfd + epoll(主动检查模型)

Tokio 底层走的是这条路径。timerfd 将定时器暴露为文件描述符,epoll 可以像监听网络事件一样监听它:

// 创建一个定时器文件描述符
int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
struct itimerspec spec = {
    .it_value = { .tv_sec = 2, .tv_nsec = 0 }  // 2 秒后触发
};
timerfd_settime(tfd, 0, &spec, NULL);

// 将 timerfd 注册到 epoll
int epfd = epoll_create1(0);
struct epoll_event ev = { .events = EPOLLIN, .data.fd = tfd };
epoll_ctl(epfd, EPOLL_CTL_ADD, tfd, &ev);

// 阻塞直到定时器到期
epoll_wait(epfd, &ev, 1, -1);
uint64_t exp;
read(tfd, &exp, sizeof(exp));  // 消耗到期事件

当定时器到期时,timerfd 变为可读,epoll_wait 返回。这与 Tokio 的 park_timeout 最终调用 epoll_wait(timeout_ms) 是同一个底层机制——只不过 Tokio 把超时时间作为 epoll_wait 的参数传递,而 C 版本显式创建了一个独立的 timerfd。两种方式最终都进入内核的 hrtimer 子系统。

timer_create + SIGEV_THREAD(自动回调模型)

C 标准还提供了另一种方式——POSIX 定时器。设置 sigev_notify = SIGEV_THREAD 后,内核在定时器到期时自动创建一个新线程来运行回调函数:

#include <signal.h>
#include <time.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

void timer_callback(union sigval val) {
    int id = *(int *)val.sival_ptr;
    printf("定时器 %d 到期!线程 ID: %lu\n", id, pthread_self());
}

int main() {
    timer_t timer;
    int id = 42;
    struct sigevent sev = {
        .sigev_notify = SIGEV_THREAD,
        .sigev_value.sival_ptr = &id,
        .sigev_notify_function = timer_callback,
    };
    timer_create(CLOCK_MONOTONIC, &sev, &timer);

    struct itimerspec spec = {
        .it_value = { .tv_sec = 2, .tv_nsec = 0 }
    };
    timer_settime(timer, 0, &spec, NULL);

    printf("主线程继续做其他事...\n");
    pause();
    return 0;
}

这种方式看起来很像”注册回调,自动执行”的理想模型。但在高性能服务器中很少使用——从信号/线程上下文中有太多限制:不能可靠访问线程局部存储、锁的约束更严格、线程频繁创建销毁的开销不可控。因此,实际的高性能 C 程序(如 Nginx、Redis)都走 timerfd + epoll 路径,而不是 SIGEV_THREAD

Tokio 的 park_timeout 本质上更接近 timerfd 模型——它不是靠回调自动执行的,而是把超时时间传给 epoll_wait,到期后线程被内核唤醒,再由 Driver 去时间轮里收割到期定时器。

为什么 Tokio 不走 SIGEV_THREAD 或信号路径?

既然内核提供了 SIGEV_THREAD 这种”自动回调”机制,Tokio 为什么不用?原因有三。

1. 锁竞争爆炸。 如果几十个定时器同时到期,每个都起一个线程去跑回调,这些线程会同时争 Wheel 的锁和任务的运行队列锁。而 Tokio 现在的设计是只有一个 Driver 线程在 park/unpark 循环里处理定时器——处理完一把锁释放、批量唤醒,零锁争用。

2. 没有批处理。 epoll_wait 返回后,Driver 在 process_at_time 里一次性收割所有到期定时器,把 waker 收集到 WakeList 里批量调用(参见第五节)。如果每个定时器单独起一个线程或信号,就失去了批处理的机会——每个回调都得单独走一遍上下文切换。

3. 信号的安全黑洞与合作式调度冲突。 信号处理函数只能调用 async-signal-safe 的函数,不能锁 mutex、不能分配内存,几乎什么也干不了。SIGEV_THREAD 虽然避开了信号限制,但它的抢占式特性会破坏 Tokio 的合作式调度模型。而现在的 epoll_wait → Driver 处理 → Waker::wake() → 任务入队 → 调度器 poll,每一步都在运行时掌控之中。

本质上,Tokio 已经有了回调机制——Waker。问题不是”怎么通知”,而是”怎么高效地通知”。epoll_wait 不是轮询,而是被内核挂起;醒来后批量收割时间轮,本质是把 n 个独立的”自动回调”合并成 1 个可预测的批处理循环。

6.2 红黑树 vs 哈希时间轮

有趣的是,内核的 hrtimer 和 Tokio 用了不同的数据结构。Tokio 的 time driver 使用哈希时间轮(本文第三节),而 Linux 的 hrtimer 使用红黑树(rbtree)来组织定时器19

// include/linux/hrtimer_defs.h: 27-35
struct hrtimer_clock_base {
    struct hrtimer_cpu_base    *cpu_base;
    clockid_t                  clockid;
    unsigned int               seq;
    // 按到期时间排序的红黑树根节点
    struct timerqueue_linked_head active;
    ktime_t                    (*get_time)(void);
    ktime_t                    offset;
};

enqueue_hrtimer() 将定时器插入红黑树,时间复杂度 O(log n)20

// kernel/time/hrtimer.c: 1096-1104
/*
 * enqueue_hrtimer - internal function to (re)start a timer
 *
 * The timer is inserted in expiry order. Insertion into the
 * red black tree is O(log(n)).
 */
static bool enqueue_hrtimer(struct hrtimer *timer,
                            struct hrtimer_clock_base *base,
                            enum hrtimer_mode mode, bool was_armed)

这并不是说哪种数据结构更优——它们只是面向不同的约束。内核的 hrtimer 需要纳秒级精度和动态的到期时间分布,红黑树的 O(log n) 插入更为稳妥。Tokio 的哈希时间轮只需要毫秒级精度,但需要管理海量定时器(成千上万个 Sleep 并发),O(1) 插入和批量降级更适合这种场景。

有意思的是,Linux 内核也有一套低精度定时器(timer wheel),代码路径在 kernel/time/timer.c,它用的正是和 Tokio 类似的层级哈希轮结构——8 到 9 层,每层 64 个 bucket(LVL_SIZE = 64LVL_DEPTH = 8/9),粒度和覆盖范围的表21

// kernel/time/timer.c: 104-115 (HZ=1000 时)
//
// HZ 1000 steps
// Level Offset  Granularity            Range
//  0      0         1 ms                0 ms -         63 ms
//  1     64         8 ms               64 ms -        511 ms
//  2    128        64 ms              512 ms -       4095 ms
//  3    192       512 ms             4096 ms -      32767 ms
//  4    256      4096 ms (~4s)      32768 ms -     262143 ms
//  5    320     32768 ms (~32s)    262144 ms -    2097151 ms
//  6    384    262144 ms (~4m)    2097152 ms -   16777215 ms
//  7    448   2097152 ms (~34m)  16777216 ms -  134217727 ms
//  8    512  16777216 ms (~4h)  134217728 ms - 1073741822 ms

对比 Tokio 的 6 层 64 slot 设计,两者的核心思想同源——都源自哈希时间轮的经典设计。

6.3 从时钟中断到进程唤醒

当硬件时钟事件设备(Local APIC Timer 或 HPET)在 deadline 到达时产生中断,内核进入 hrtimer_interrupt()。这个函数做了几件关键的事20

  1. 更新时间基准hrtimer_update_base() 读取硬件计数器和系统时间
  2. 处理到期定时器__hrtimer_run_queues() 遍历红黑树中所有到期时间 ≤ now 的定时器
  3. 调用定时器回调:对于 sleep 类定时器,回调是 hrtimer_wakeup()

hrtimer_wakeup 的实现非常直接——找到等待中的进程,把它叫醒20

// kernel/time/hrtimer.c: 2184-2193
static enum hrtimer_restart hrtimer_wakeup(struct hrtimer *timer)
{
    struct hrtimer_sleeper *t =
        container_of(timer, struct hrtimer_sleeper, timer);
    struct task_struct *task = t->task;

    t->task = NULL;
    if (task)
        wake_up_process(task);

    return HRTIMER_NORESTART;
}

wake_up_process() 将进程状态设为 TASK_RUNNING 并放入就绪队列。随后内核的调度器在合适的时机把它调度回 CPU。Tokio 的 worker 线程从 epoll_wait 返回,继续执行 process_at_time(),完成定时器的处理。

6.4 关键细节:这不是周期性的 tick

需要注意,现代 Linux 在 NOHZ(dynticks)模式下并不是靠周期性的时钟中断来驱动定时器的,而是把时钟事件设备编程为一次性(one-shot)触发hrtimer_interrupt() 在处理完当前到期的定时器后,会重新编程时钟设备,让它在下一次最早到期的 deadline 时再产生中断20

这意味着如果系统上没有定时器在等待(或者最近的定时器在 5 秒后),CPU 可以进入深度休眠状态,期间不产生任何时钟中断。这也是为什么 Tokio 的 park_timeout 在”没有定时器且没有 limit”时可以直接无限期 park——内核不会为它浪费任何 CPU 周期。

这条从 tokio::time::sleep 到硬件时钟中断的链路也就清楚了:

tokio::time::sleep(5s).await
    → Sleep::poll() 返回 Pending,注册到哈希时间轮
    → Driver::park_timeout(5s)
    → mio → epoll_wait(timeout=5000ms)
    → 内核 schedule_hrtimeout_range() → 插入红黑树
    → 编程时钟设备 one-shot 5 秒后触发
    → [CPU 休眠或服务其他任务]
    → 5 秒后时钟中断 → hrtimer_interrupt()
    → __hrtimer_run_queues() → hrtimer_wakeup() → wake_up_process()
    → epoll_wait 返回 → 线程醒来
    → process_at_time() → fire → wake_all()
    → Sleep::poll() 返回 Ready

6.5 定时器管理的存放位置:用户态 vs 内核态

如果把这几种方式放在一起对比,一个更本质的区别浮现出来——定时器到底存放在哪一层管理:

方案 定时器存储位置 数据结构 操作复杂度 内核中管理的定时器数量
C timerfd 内核 hrtimer 红黑树 O(log n) n 个
C timer_create(SIGEV_THREAD) 内核 hrtimer 红黑树 O(log n) n 个
Tokio 时间轮 用户态哈希时间轮 多层时间轮 O(1) + 1 × O(log 1) 1 个

Tokio 在用户态维护时间轮,只向内核注册一个”最近 deadline”的唤醒点。这意味着几万个定时器的插入和删除都在用户态完成,O(1),没有系统调用;内核只需要管理 1 个定时器(而不是 n 个),红黑树的 O(log 1) ≈ O(1)。精度降低到毫秒级,但吞吐量大幅提升。

graph LR
    subgraph "C timerfd(n 个定时器全部进内核)"
        A1["用户态 app"] -->|"timerfd_settime × n"| K1["内核 hrtimer<br/>红黑树<br/>O(log n) 管理 n 个"]
    end

    subgraph "Tokio(1 个定时器进内核)"
        A2["用户态 app"] -->|"O(1) 插入"| W["用户态哈希时间轮<br/>6 层 × 64 slot"]
        W -->|"只注册最早 deadline"| K2["内核 hrtimer<br/>红黑树<br/>O(log 1) 管理 1 个"]
    end

这正是 Varghese & Lauck 论文12思想在工程中的体现:用用户态的时间轮 + 少量内核辅助,替代全部在内核红黑树中管理。Tokio 不是在”绕过”内核机制,而是在它之上加了一层更高效的用户态调度。

6.6 epoll_wait 在这条链路中的真实角色

前面说了那么多”内核 hrtimer”和”epoll_wait”,容易产生一个误解:epoll_wait 在管理 tokio 的定时器吗?

答案是 epoll_wait 完全不知道 tokio 时间轮的存在。epoll_wait 的 timeout 参数只是一个”线程睡眠闹钟”。

本质上,epoll_wait 内部是两个唤醒源的 OR 组合:

epoll_wait(epfd, events, maxevents, timeout)
                ↑                       ↑
           I/O 事件就绪              hrtimer 到期
          (网卡中断 →               (时钟中断 →
           协议栈 → epoll)            LAPIC → hrtimer)

任何一个条件满足,epoll_wait 就返回。内核里大致是这样的循环22

// 内核 ep_poll() 简化伪代码
int epoll_wait(..., int timeout_ms) {
    if (timeout_ms > 0)
        schedule_hrtimeout_range(timeout_ms);  // 设一个 hrtimer

    while (1) {
        if (events_ready)  return;        // I/O 事件来了
        if (timeout_expired) return 0;    // hrtimer 到了
        schedule();  // 让出 CPU,等任一条件满足
    }
}

tokio 利用的正是这个”谁先到就醒谁”的 OR 语义——I/O 事件先到就提前回来顺便收割 timer,hrtimer 先到就准时处理到期 timer 并回到 epoll 继续等 I/O。

tokio 时间轮:管理 1,000,000 个 Sleep
    ↓ 算出最早 deadline = now + 5s
    ↓ 传给 epoll_wait 作为 timeout
epoll_wait:设 1 个内核 hrtimer,5s 后叫醒线程
    ↓ epoll_wait 不知道 tokio 的时间轮
    ↓ epoll_wait 不知道 StateCell
    ↓ epoll_wait 不知道 Waker
    ↓ 它只是一个准确的线程睡眠定时器
    ↓ 5s 后
epoll_wait 返回 → 回到 tokio Driver
    ↓
tokio process_at_time():用自己维护的 elapsed 推进时间轮
    ↓ 收割到期 timer
    ↓ fire → waker.wake()

如果把 epoll_wait(timeout) 换成 nanosleep(timeout) + epoll_wait(-1),效果等价22。Tokio 用 epoll_wait(timeout) 只是因为方便——一个系统调用同时等两件事(I/O 事件和定时器到期),比分开用 nanosleep + epoll_wait 少一次上下文切换。

所以 epoll_wait 的角色是:tokio 的”定时闹钟”,不是 tokio 的”定时器驱动”。tokio 的定时器驱动完全在用户态完成——elapsed 自维护、时间轮自推进、StateCell 自管理。epoll_wait 只是”到点把线程叫醒”的门铃。

这个角色区分是整个体系的精妙之处:内核做它最擅长的事——准确到毫秒地叫醒线程;tokio 做它最擅长的事——在海量定时器中高效找出哪些已经到期

七、TimeSource:时间与 tick 的桥梁

TimerEntry 使用 Instant 作为对外的时间抽象,而 Wheel 内部使用 u64 毫秒级 tick。TimeSource 负责两者之间的转换23

// tokio/src/runtime/time/source.rs: 6-38
pub(crate) struct TimeSource {
    start_time: Instant,
}

impl TimeSource {
    pub(crate) fn deadline_to_tick(&self, t: Instant) -> u64 {
        // 向上取整到最近的毫秒
        self.instant_to_tick(t + Duration::from_nanos(999_999))
    }

    pub(crate) fn instant_to_tick(&self, t: Instant) -> u64 {
        let dur: Duration = t.saturating_duration_since(self.start_time);
        let ms = dur.as_millis().try_into()
            .unwrap_or(MAX_SAFE_MILLIS_DURATION);
        ms.min(MAX_SAFE_MILLIS_DURATION)
    }

    pub(crate) fn now(&self, clock: &Clock) -> u64 {
        self.instant_to_tick(clock.now())
    }
}

展开来看两个转换函数。instant_to_tick 是核心23

pub(crate) fn instant_to_tick(&self, t: Instant) -> u64 {
    let dur: Duration = t.saturating_duration_since(self.start_time);
    //                    ↑ 核心公式:t - start_time
    let ms = dur.as_millis().try_into()...;  // u128 → u64
    ms.min(MAX_SAFE_MILLIS_DURATION)
}

deadline_to_tick 在它之上加了向上取整:

pub(crate) fn deadline_to_tick(&self, t: Instant) -> u64 {
    self.instant_to_tick(t + Duration::from_nanos(999_999))
    //                    ↑ 加 0.999999ms,任何不足 1ms 都被进位
}

关系是:Wheel.elapsedStateCell.state 都来自 TimeSource 的同一个参考原点 start_time

reset(deadline) 为例,tick 是通过 deadline_to_tick(deadline) 算出的:

tick = deadline_to_tick(deadline)
     = instant_to_tick(deadline + 999_999ns)
     = ((deadline + 999_999ns) - start_time).as_millis()
     = (deadline - start_time + 999_999ns).as_millis()

如果此时 now - start_time = 5000ms,而 deadline = now + 5min

tick = ((now + 5min + 999_999ns) - start_time).as_millis()
     = ((start_time + 5000 + 300000 + 0.999999 - start_time)).as_millis()
     = (300000 + 5000 + 0.999999).as_millis()
     = 305,000  // as_millis() 丢纳米,等价于向下取整

elapsedstate 的差值就是真正的剩余时间。因为两者都是相对于 start_time 的毫秒偏移量:

state - elapsed = (deadline - start_time) - (now - start_time)
                = deadline - now
                = 剩余等待时间(毫秒)

三个关键数值的关系可以总结为:

数值 保存在哪 本质 用途
Wheel.elapsed Wheel 坐标原点偏移量:当前 now - start_time 的毫秒值 推进时间轮(elapsed += delta),判定 timer 是否到期(state <= elapsed
registered_when TimerShared slot 定位缓存:上次插入时的 state 快照,持 Relaxed 顺序 算 slot 索引(slot_for)、从 slot 链表移除(remove
StateCell.state StateCell 真实到期时间deadline - start_time,更新需 CAS(AcqRel mark_pending() 中做最终到期判断(state > not_after ?)

registered_whenstate 的只读缓存——两者绝大多数时候相同,唯一的例外是 extend_expiration 无锁延后了 state 但未更新 registered_when 的间隙。此时 slot 位置暂时”不准”,但 Driver 在收割时通过 mark_pending 返回的 Err 检测到这个偏差,用 sync_when()state 同步回 registered_when 并重插入正确 slot。

所以决定”到没到期”的只有一行判断——state > not_afterregistered_when 只是加速 slot 定位的缓存,elapsed 只是标识”现在几点”的指针。state 是唯一的 source of truth

这就验证了第 3.2 节例子中的核心假设:elapsed=0 时,tick=300000 等价于 deadline - start_time = 300000ms。如果 elapsed=5000(已经运行了 5 秒),tick 就是 305000——state 永远比 elapsed 大一个固定差值。

一个具体的数值对比能说清”绝对 vs 相对”的区别。假设 elapsed=10000(runtime 已运行 10 秒),此时创建 sleep(500ms)

sleep(500ms).await
  → deadline = now + 500ms
  → deadline_to_tick(deadline)
     = (deadline + 999_999ns - start_time).as_millis()
     = (now + 500ms - start_time).as_millis()
     = (10000 + 500).as_millis()
     = 10500                          ← 绝对毫秒偏移量

state - elapsed = 10500 - 10000 = 500  ← 差值才等于注册时长

state绝对毫秒偏移量,不是相对值。如果存相对值(state=500),那 state - elapsed = 500 - 10000 就会包装溢出。elapsed ≥ state 的到期判断也变成一句加法——选绝对值省了这条指令,还保持了 u64 减法的一致性。

deadline_to_tick999_999ns 加法是向上取整的关键:假设用户调用了 sleep(Duration::from_micros(1))deadlinestart_time 只差 1μs,直接 as_millis() 得到 0,timer 立刻到期,根本不等待。加上 999_999nsas_millis() 得到 1——最小等待 1ms。这正是 1ms 精度约束在代码中的直接体现。

MAX_SAFE_MILLIS_DURATIONu64::MAX - 1,约为 5.84 亿年,远超实际需要。

u64 溢出保护

你可能会担心 elapsedstate 都是 u64 毫秒值——如果运行足够久会溢出吗?

三层保护防止溢出问题:

第一层:STATE_DEREGISTEREDSTATE_PENDING_FIRE 占用了 u64 的顶值StateCell.stateu64::MAX 被用作 STATE_DEREGISTERED(已注销),u64::MAX - 1 被用作 STATE_PENDING_FIRE(待触发)。所以有效的最大 tick 是 u64::MAX - 2。如果 TimeSource::instant_to_tick 算出来的值超过了这个范围,会被 ms.min(MAX_SAFE_MILLIS_DURATION) 钳住。

第二层:时间轮的 MAX_DURATION 限制。6 层 × 64 slot 的时间轮能表达的最大范围是 2^36 - 1 ≈ 2.18 年MAX_DURATION)。超过这个范围的 timer 会被自动放入顶层(Level 5)的合适 slot,不会因为到期时间远大于 wheel 范围而出错——这是哈希时间轮论文描述的”wrap-around”处理12

第三层:纯数值比较Wheel::insert()when <= self.elapsed 是直接的 u64 比较。即使 elapsedstate 都接近 u64::MAX,差值 state - elapsed(用于计算 park_timeout)是正确的无符号结果——因为 Rust 的 saturating_sub 保证了安全性。

实际上,elapsed 从 0 以毫秒为单位递增到溢出需要约 5.84 亿年。在那之前 runtime 早就被重启无数次了。MAX_SAFE_MILLIS_DURATION1.8 × 10^14 天,也远超任何实际运行时长。

八、把整条链路串起来

现在可以把 tokio::time::sleep 的完整执行路径画出来:

tokio::time::sleep(Duration::from_secs(5))
    │
    ├─ 创建 Sleep { entry: TimerEntry { deadline: now + 5s, registered: false } }
    │
    ├─ 首次 poll:
    │   ├─ TimerEntry::reset(deadline, true)
    │   │   ├─ 初始化 TimerShared(如果还没有)
    │   │   ├─ 将 deadline 转为 tick → 插入 Wheel(哈希时间轮)
    │   │   └─ 设置 registered = true
    │   ├─ StateCell::poll(cx.waker())
    │   │   ├─ 注册 waker 到 AtomicWaker
    │   │   └─ 加载 state → 还不是 STATE_DEREGISTERED
    │   └─ 返回 Poll::Pending
    │
    ├─ Runtime::park_internal:
    │   ├─ Wheel::next_expiration_time() → Some(5000)
    │   ├─ 计算 duration = 5000ms
    │   └─ OS::park_timeout(5000ms)  ← 线程休眠 5 秒
    │
    ├─ 5 秒后 OS 唤醒线程:
    │   ├─ Wheel::poll(now) → 找到到期的 entry
    │   ├─ entry.fire(Ok(()))
    │   │   ├─ result ← Ok(())
    │   │   ├─ state ← STATE_DEREGISTERED
    │   │   └─ 取出 waker
    │   └─ waker_list.wake_all()  ← 把任务放回调度队列
    │
    └─ 任务被再次 poll:
        ├─ StateCell::read_state()
        │   └─ cur_state == STATE_DEREGISTERED → 读取 result = Ok(())
        └─ 返回 Poll::Ready(())
sequenceDiagram
    participant User as User Task
    participant Sleep as Sleep Future
    participant Entry as TimerEntry
    participant Cell as StateCell
    participant Wheel as Hashed Wheel
    participant Driver as Time Driver
    participant OS as OS

    User->>Sleep: .await → poll(cx)
    Sleep->>Entry: poll_elapsed(cx)
    Entry->>Wheel: reset(deadline) → insert to Wheel
    Entry->>Cell: poll(waker) → register waker
    Cell-->>Entry: Poll::Pending
    Entry-->>Sleep: Poll::Pending
    Sleep-->>User: Poll::Pending

    Note over User: Worker 线程继续服务其他任务

    Driver->>Wheel: next_expiration_time() → 5000ms
    Driver->>OS: park_timeout(5000ms)
    Note over OS: ... 5 秒 ...
    OS-->>Driver: 唤醒

    Driver->>Wheel: poll(now) → 到期 entry
    Driver->>Cell: fire(Ok(()))
    Cell-->>Driver: Some(waker)
    Driver->>Driver: waker.wake()

    Note over User: 任务被重新调度
    User->>Sleep: poll(cx)
    Sleep->>Cell: read_state()
    Cell-->>Sleep: Poll::Ready(Ok(()))
    Sleep-->>User: Poll::Ready(())

九、取消、重置与交互细节

Sleep 的设计还支持两个重要的交互场景:

取消:直接 drop 掉 Sleep 实例即可。TimerEntryPinnedDrop 实现会调用 cancel(),将定时器从时间轮中移除4。没有额外的清理工作。

重置:可以通过 Pin<&mut Sleep>::reset(new_deadline) 来改变 Sleep 的到期时间,而无需创建新的 Future2。内部实现会先尝试原子地延后到期时间(extend_expiration),如果 CAS 成功则无需操作时间轮;如果提前了到期时间,则会重新走 reregister 路径。

这种”乐观延后”的优化利用了定时器一个常见的使用模式——超时通常在操作开始时设置,然后在操作结束前被取消。延后触发时,旧的 slot 位置依然是”安全的”(不会过早触发),所以可以用 CAS 无锁更新。

总结

tokio::time::sleep 的实现体现了异步系统设计的核心分层:

  1. 接口层sleep() / Sleep):对用户暴露一个 .await 即可等待的 Future
  2. 运行时层TimerEntry / TimerShared / StateCell):用一个原子状态机管理定时器的生命周期,协调用户端和驱动端的并发访问
  3. 数据结构层Wheel):用 6 层哈希时间轮实现 O(1) 插入和高效的到期检测
  4. 驱动层Driver::park_internal):通过操作系统的定时唤醒(park_timeout)驱动整个机制运转

最终的效果是:一个单线程可以同时管理成千上万个定时器,线程只会在最近的 deadline 时休眠,期间 CPU 可以服务其他任务或进入省电状态tokio::time::sleep 的高效并发能力不是来自魔法,而是来自一条从 Future 到 OS 的精心设计的链路。

References

  1. 之前的文章:《Rust async/await 的底层契约:从 Future::poll 到 Tokio 运行时》,2026-04-30。详细阐述了 Future::pollContextWaker 组成的核心协议。 

  2. Tokio 源码,Sleep 结构定义,tokio/src/time/sleep.rs。包含 Sleep 的完整 Future 实现,sleep()sleep_until() 的构造逻辑。  2

  3. Tokio 源码,Timer enum 定义,tokio/src/runtime/mod.rsTimerTraditional(TimerEntry)Alternative(time_alt::Timer) 的枚举封装。 

  4. Tokio 源码,TimerEntry 结构定义,tokio/src/runtime/time/entry.rs。包含 TimerEntry 的关键方法:newresetpoll_elapsedcancel。  2

  5. Tokio 源码,TimerShared 结构定义,tokio/src/runtime/time/entry.rs。前端与驱动端共享的状态,包含侵入式链表指针、registered_whenStateCell。 

  6. Tokio 源码,TimerEntry::poll_elapsed 方法,tokio/src/runtime/time/entry.rs。首次 poll 时注册到时间轮,后续 poll 时检查状态。 

  7. Tokio 源码,TimerEntry::reset 方法,tokio/src/runtime/time/entry.rsreset() 先尝试 extend_expiration 乐观路径,失败后通过 inner.into()&TimerShared 转为 NonNull<TimerShared> 并调用 Handle::reregister()。 

  8. Tokio 源码,Handle::reregister 方法,tokio/src/runtime/time/mod.rs。接收 NonNull<TimerShared>,通过 entry.as_ref().handle() 创建 TimerHandle,调用 lock.wheel.insert(entry) 重新插入时间轮。 

  9. Tokio 源码,TimerHandle 结构定义和 TimerShared::handle() 方法,tokio/src/runtime/time/entry.rsTimerHandleNonNull<TimerShared> 的包装,不拥有数据,由 unsafe 契约确保安全使用。 

  10. Tokio 源码,Wheel::insertLevel::add_entrytokio/src/runtime/time/wheel/mod.rs 以及 tokio/src/runtime/time/wheel/level.rsWheel::insert() 计算层级后调用 Level::add_entry(),后者将 TimerHandle push 到对应 slot 的侵入式链表中。 

  11. Tokio 文档,sleep() 的 panic 说明,tokio/src/time/sleep.rs。解释了为什么 rt.block_on(sleep(...)) 会 panic 而 rt.block_on(async {sleep(...).await}) 不会。 

  12. George Varghese 和 Tony Lauck,《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility》,1996。论文链接:Hashed and Hierarchical Timing Wheels。Tokio 的哈希时间轮数据结构直接引用了此论文的设计。  2 3

  13. Tokio 源码,Wheel 结构定义与文档,tokio/src/runtime/time/wheel/mod.rs。6 层、每层 64 slot 的哈希时间轮实现,以及 NUM_LEVELSMAX_DURATION 常量。  2 3

  14. Tokio 源码,LevelEntryList 定义,tokio/src/runtime/time/wheel/level.rsLevel 包含 occupied 位图和 slot: [EntryList; 64]EntryListLinkedList<TimerShared, TimerShared>,定义在 tokio/src/runtime/time/entry.rs。TimerShared 通过侵入式链表的 pointers 字段直接链接在 slot 的链表中。 

  15. Tokio 源码,StateCell 结构及其方法,tokio/src/runtime/time/entry.rs。包含 pollmark_pending(CAS 循环)、fireset_expirationextend_expiration 等原子操作。  2 3 4

  16. 关于 per-timer StateCell 的权衡分析:每个 timer 独立持有 AtomicU64,Driver 遍历 slot 时逐 timer CAS。对比替代方案(per-slot Mutex),CAS 方案在零锁争用和 O(N) 线性代价上占据优势,且 slot 的 1ms 宽度提供了天然的保护层,使遍历时间(~10ns/timer)不会影响精度。此分析基于 x86-64 上 lock cmpxchgq 指令的理论延迟约 10-20ns(无 cache miss 时),以及 level-0 slot 宽度 1ms 的事实。 

  17. Tokio 源码,runtime::driver::Driver::newHandle 结构定义,tokio/src/runtime/driver.rs 以及 tokio/src/runtime/driver.rs。Time Driver 和 I/O Driver 分别独立初始化,通过 create_io_stackcreate_time_driver 堆叠。Handle 包含 iotimesignal 三个子句柄,通过 Arc 分发给所有 worker 线程。 

  18. Tokio 源码,Driver::park_internalHandle::process_at_timetokio/src/runtime/time/mod.rs。运行时驱动定时器的核心逻辑:计算 next_wake、带超时 park、处理到期定时器、批量唤醒。  2

  19. Linux 内核源码,hrtimer_clock_base 结构定义,include/linux/hrtimer_defs.h。hrtimer 使用红黑树(timerqueue_linked_head)而非哈希时间轮来组织定时器。 

  20. Linux 内核源码,enqueue_hrtimerhrtimer_wakeuphrtimer_interruptkernel/time/hrtimer.cenqueue_hrtimer()(行 1096-1104)将定时器插入红黑树;hrtimer_wakeup()(行 2184-2193)在定时器触发时唤醒进程;hrtimer_interrupt()(行 2083-2114)是时钟中断的处理入口。  2 3 4

  21. Linux 内核源码,timer wheel 实现,kernel/time/timer.c。行 64-150 的注释详细描述了低精度定时器的层级哈希轮设计:8-9 层、每层 64 bucket(LVL_SIZE=64LVL_DEPTH=8/9),与 Tokio 的实现思想同源。 

  22. epoll_wait(timeout) 在本场景中等价于 nanosleep(timeout) + epoll_wait(-1)——都是”先睡 timeout 时间,醒来后再 epoll 检查 I/O 事件”。Tokio 选前者只是因为一个系统调用比两个高效,功能上没有区别。  2

  23. Tokio 源码,TimeSource 实现,tokio/src/runtime/time/source.rs。负责 Instantu64 tick 之间的转换,向上取整到毫秒精度。  2

My Github Page: https://github.com/liweinan

B站视频: https://space.bilibili.com/21947620

Powered by Jekyll and Theme by solid

If you have any question want to ask or find bugs regarding with my blog posts, please report it here:
https://github.com/liweinan/liweinan.github.io/issues