从 tokio::time::sleep 看异步 Timer 的实现:一次从 Future::poll 到哈希时间轮的源码之旅
tokio::time::sleep看起来和thread::sleep只差一个.await,但背后连接的是一整套哈希时间轮、原子状态机和操作系统的定时唤醒机制。这篇文章从源码出发,把这条链路拆开来看。
引言:两个 sleep 的对比
在 Rust 里让程序”等一下”有两种常见方式:
// 方式一:标准库的阻塞 sleep
std::thread::sleep(Duration::from_secs(5));
// 方式二:Tokio 的异步 sleep
tokio::time::sleep(Duration::from_secs(5)).await;
从表面看,两者似乎差不多——都是在某个地方停了 5 秒。但它们在运行时的行为有本质区别。用一个简单的对比实验就能看出来。
如果把下面三个场景跑一遍:
| 场景 | 任务1 | 任务2 | 总耗时 |
|---|---|---|---|
| 原生线程(2 个线程) | 同步 sleep 5s | 同步 sleep 2s | 5s |
| Tokio 单线程 + 同步阻塞 | thread::sleep 5s |
tokio::time::sleep 2s |
7s |
| Tokio 单线程 + 全异步 | tokio::time::sleep 5s |
tokio::time::sleep 2s |
5s |
先看场景一——在 async 任务里混入同步阻塞的版本:
// tokio_block_bad.rs
use tokio::time;
use std::time::Duration;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let start = std::time::Instant::now();
let task1 = tokio::spawn(async move {
println!("[任务1] 开始阻塞, 时间: {:?}", start.elapsed());
std::thread::sleep(Duration::from_secs(5));
println!("[任务1] 结束阻塞, 时间: {:?}", start.elapsed());
});
let task2 = tokio::spawn(async move {
println!("[任务2] 开始异步等待, 时间: {:?}", start.elapsed());
time::sleep(Duration::from_secs(2)).await;
println!("[任务2] 结束异步等待, 时间: {:?}", start.elapsed());
});
let _ = tokio::join!(task1, task2);
}
运行结果:
[任务1] 开始阻塞, 时间: 32.1µs
... 卡住 5 秒,task2 根本没机会开始 ...
[任务1] 结束阻塞, 时间: 5.001s
[任务2] 开始异步等待, 时间: 5.001s
[任务2] 结束异步等待, 时间: 7.002s
task2 被 task1 的同步阻塞卡了整整 5 秒,总耗时 7 秒。
再看场景二——两个任务都用异步 sleep:
// tokio_async_good.rs
use tokio::time;
use std::time::Duration;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let start = std::time::Instant::now();
let task1 = tokio::spawn(async move {
println!("[任务1] 开始异步等待5秒, 时间: {:?}", start.elapsed());
time::sleep(Duration::from_secs(5)).await;
println!("[任务1] 结束异步等待, 时间: {:?}", start.elapsed());
});
let task2 = tokio::spawn(async move {
println!("[任务2] 开始异步等待2秒, 时间: {:?}", start.elapsed());
time::sleep(Duration::from_secs(2)).await;
println!("[任务2] 结束异步等待, 时间: {:?}", start.elapsed());
});
let _ = tokio::join!(task1, task2);
}
运行结果:
[任务1] 开始异步等待5秒, 时间: 41.5µs
[任务2] 开始异步等待2秒, 时间: 43.2µs
[任务2] 结束异步等待, 时间: 2.001s
[任务1] 结束异步等待, 时间: 5.002s
总耗时 5 秒,两个任务在单线程上并发执行,互不阻塞。
场景一的 7 秒暴露了问题:当 task1 在 Tokio 的单线程 runtime 里直接调用 thread::sleep,整个工作线程都被阻塞,task2 根本得不到执行机会。而场景二的两个异步 sleep 却在单线程上实现了并发——task2 在 2 秒后准时完成。
关键不在于”等待”本身,而在于用什么方式等待。thread::sleep 把线程卡住;tokio::time::sleep 只让任务挂起,线程依然可以服务其他任务。
这篇文章的目的就是拆开 tokio::time::sleep 的内部实现,看看它到底是怎么做到的。
在进入细节之前,可以先回顾一下异步系统的基础契约。在之前的文章《Rust async/await 的底层契约:从 Future::poll 到 Tokio 运行时》1中,我们梳理了 Future::poll、Context、Waker 组成的核心协议:任务被 poll,如果未就绪就返回 Poll::Pending 并保存 Waker,事件就绪后通过 wake() 通知运行时重新调度。sleep 正是这套协议的一个具体应用——它用”时间到达”作为就绪条件。
概念先行:从伪代码理解核心思路
在深入源码之前,先建立一个直觉。tokio::time::sleep 的秘密可以用一句话概括:它不会让线程去空等,而是注册一个”闹钟”后立刻把线程还给调度器,到时间了再由闹钟叫醒它。
具体来说,下面这行代码实际触发了 6 个步骤:
tokio::time::sleep(Duration::from_secs(5)).await;
sequenceDiagram
participant Task as 你的任务
participant Exec as Tokio 调度器
participant Timer as Tokio 定时器(全局)
Task->>Timer: 1. 注册:"5秒后唤醒我"
Task->>Exec: 2. 返回 Poll::Pending
Exec->>Exec: 3. 切换去执行其他任务
Note over Timer: 时钟滴答滴答...
Timer->>Task: 4. 5秒到了,调用 Waker.wake()
Task->>Exec: 5. 重新排队等待执行
Exec->>Task: 6. 再次 poll,返回 Poll::Ready
如果把这个过程写成极简的伪代码,大致是这样的:
// 简化的 Sleep Future(概念示意)
struct Sleep {
deadline: Instant,
registered: bool,
}
impl Future for Sleep {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if Instant::now() >= self.deadline {
// 时间到了,完成
return Poll::Ready(());
}
if !self.registered {
// 时间没到:把 waker 注册到全局定时器
// 定时器会在 deadline 时调用 waker.wake()
let waker = cx.waker().clone();
TOKIO_TIMER.insert(self.deadline, waker);
self.registered = true;
}
// 关键:返回 Pending,交出执行权
Poll::Pending
}
}
核心行为只有三个:
- 首次 poll:发现时间没到 → 把
Waker(闹钟)注册到全局定时器 → 返回Pending,线程继续服务其他任务 - 等待期间:零轮询、零浪费——不需要每秒检查 1000 次”时间到了吗”,定时器在精确时间点触发唤醒
- 定时器触发:
Waker::wake()通知调度器把任务放回就绪队列,下次 poll 时返回Ready
这和张嘴就来的 thread::sleep 有着本质区别:
| 方面 | thread::sleep |
tokio::time::sleep |
|---|---|---|
| 线程状态 | OS 挂起线程 | 线程继续执行其他任务 |
| CPU 利用 | 0%(但线程资源被占) | 100%(有效利用) |
| 可并发量 | ≈ 线程数(几千上限) | 百万级任务 |
| 响应方式 | 时间到自动唤醒 | 通过 Waker 通知调度器 |
有了这个心理模型,接下来就可以拆开源代码,看 Tokio 怎么把伪代码里的 TOKIO_TIMER 变成真正的数据结构。
一、Sleep 的结构:一个 Future 的解剖
tokio::time::sleep(duration) 的签名返回一个 Sleep 结构体:
pub fn sleep(duration: Duration) -> Sleep {
// 计算 deadline = now + duration
// 然后调用 Sleep::new_timeout(deadline, location)
}
它实现了 Future trait,所以可以 .await。但 Sleep 里面到底放了什么?核心字段如下2:
// tokio/src/time/sleep.rs: 220-232
pin_project! {
pub struct Sleep {
// 内部字段(tracing 相关,非关键路径)
inner: Inner,
// 连接 Sleep 实例和驱动它的 Timer 的关键纽带
#[pin]
entry: Timer,
}
}
entry 字段的类型是 Timer,它在 tokio::runtime 内部被定义为一个 enum3:
// tokio/src/runtime/mod.rs: 437-442
pub(crate) enum Timer {
Traditional(time::TimerEntry),
Alternative(time_alt::Timer), // 仅 tokio_unstable + rt-multi-thread
}
本文聚焦在 Traditional 分支——也就是默认的 Timer 实现。TimerEntry 的结构如下4:
// tokio/src/runtime/time/entry.rs: 287-310
pub(crate) struct TimerEntry {
// Arc 引用到 runtime 的调度器句柄
driver: scheduler::Handle,
// 共享的内部结构,参与侵入式链表
#[pin]
inner: Option<TimerShared>,
// deadline:首次 poll 时才注册到时间轮
deadline: Instant,
// 是否已经注册
registered: bool,
}
而 TimerShared 是前端(TimerEntry)和驱动端(Driver)之间共享的状态5:
// tokio/src/runtime/time/entry.rs: 336-360
pub(crate) struct TimerShared {
// 侵入式双向链表的指针
pointers: linked_list::Pointers<TimerShared>,
// 注册到 Wheel 时的时间(原子变量)
registered_when: AtomicU64,
// 当前状态:到期时间、已触发、或已注销
state: StateCell,
_p: PhantomPinned,
}
这几个结构的层次关系可以总结为下图:
graph TD
A["tokio::time::sleep(duration)"] --> B["Sleep<br/>(Future)"]
B --> C["Timer<br/>(enum)"]
C --> D["TimerEntry<br/>(Traditional 分支)"]
D --> E["TimerShared<br/>(共享状态)"]
E --> F["StateCell<br/>(原子状态 + Waker)"]
E --> G["registered_when<br/>(AtomicU64)"]
E --> H["pointers<br/>(侵入式链表节点)"]
这个图展示的是数据归属关系——从用户 API(sleep())一层层往下走到 StateCell。但 TimerEntry 持有的 driver: scheduler::Handle 不在这条垂直链上——它是横向的通道,把每个 TimerEntry 连接到全局唯一的 Driver。这里 TimerEntry 持有 scheduler::Handle,但真正处理 StateCell 的是 time::Handle。它们之间隔着三层导航,容易混淆。下面是完整的 Handle 类型层次和职责划分:
TimerEntry.driver : scheduler::Handle ← 运行时全貌(task 调度 + blocking pool + 所有 driver)
│
└─ .driver() → driver::Handle ← 各 driver 的聚合体
├─ .io: IoHandle ← I/O driver(mio)
├─ .signal: SignalHandle
├─ .clock: Clock
└─ .time() → time::Handle ← Time Driver 专用句柄
├─ time_source ← Instant ↔ tick
└─ inner ← Mutex<InnerState { wheel }>
└─ 通过 wheel.poll → fire → StateCell
scheduler::Handle是”门卡”——TimerEntry存它是因为创建时只拿得到这个(Handle::current()返回的就是它)。它上面挂载了完整的运行时能力(spawn task、blocking pool、所有 driver),不只是 timerdriver::Handle是”楼层卡”——I/O、time、signal、clock 的聚合体。reset()需要走self.driver.driver().io来 unpark,所以TimerEntry不能只存time::Handletime::Handle才是真正操作StateCell的”钥匙”——它持有Mutex<InnerState { wheel }>,执行process(clock)→Wheel::poll→entry.fire()→StateCell::fire()。但TimerEntry从不在字段里直接存它,而是每次通过.driver().time()导航获取
graph TD
A["scheduler::Handle<br/>运行时全貌"] -->|".driver()"| B["driver::Handle<br/>所有 driver 聚合"]
B -->|".io"| C["IoHandle<br/>I/O driver"]
B -->|".time()"| D["time::Handle<br/>Time Driver 专用"]
B -->|".clock"| E["Clock"]
D -->|"持有"| F["Mutex<InnerState>"]
F -->|"包含"| G["Wheel<br/>哈希时间轮"]
F -->|"包含"| H["next_wake"]
I["TimerEntry<br/>用户侧句柄"] -->|".driver 字段"| A
I -->|"持有"| J["TimerShared<br/>共享状态"]
J -->|"包含"| K["StateCell<br/>原子状态机"]
J -->|"包含"| L["registered_when"]
G -->|"poll 返回"| J
D -->|"process → fire"| K
style A fill:#e1f5fe
style B fill:#fff3e0
style D fill:#c8e6c9
style I fill:#f3e5f5
style K fill:#ffcdd2
TimerEntry 用 scheduler::Handle 当通行证,需要 timer 就往下走到 time(),需要 I/O 就走到 io。但 StateCell 只归 time::Handle 的 process() 链操作——I/O driver 和 signal driver 永远不会碰它。
下面的调用链展示了从 TimerEntry.driver(scheduler::Handle)导航到 time::Handle 再到 process() 的完整路径——这就是上文类图和数据流图的代码对应:
// tokio/src/runtime/time/entry.rs: TimerEntry::poll_elapsed 及相关方法
impl TimerEntry {
// TimerEntry 拿到 time::Handle 的唯一途径
fn driver(&self) -> &super::Handle {
self.driver // ① scheduler::Handle (timer 创建时存入)
.driver() // ② → driver::Handle (调度器句柄上的 .driver() 方法)
.time() // ③ → time::Handle (driver 聚合体上的 .time() 方法)
}
pub(crate) fn poll_elapsed(&self, cx: &mut Context<'_>) -> Poll<...> {
// self.driver() 返回 time::Handle,但 poll 不需要它——
// StateCell 就在自己内部的 TimerShared 里
self.inner.state.poll(cx.waker())
}
}
三个 Handle 的精简结构定义——只列出与本链相关的字段和方法:
// === tokio/src/runtime/scheduler/mod.rs ===
pub(crate) enum Handle { // ① scheduler::Handle
CurrentThread(Arc<current_thread::Handle>),
MultiThread(Arc<multi_thread::Handle>),
}
impl Handle {
pub(crate) fn driver(&self) -> &driver::Handle { ... } // → ②
}
// === tokio/src/runtime/driver.rs ===
pub(crate) struct Handle { // ② driver::Handle
pub(crate) io: IoHandle, // I/O driver 句柄
pub(crate) signal: SignalHandle, // 信号 driver 句柄
pub(crate) time: TimeHandle, // TimeHandle = Option<time::Handle>
pub(crate) clock: Clock, // 时钟源
}
impl Handle {
pub(crate) fn time(&self) -> &time::Handle { ... } // → ③
}
// === tokio/src/runtime/time/handle.rs ===
pub(crate) struct Handle { // ③ time::Handle
pub(super) time_source: TimeSource, // Instant ↔ tick 转换
pub(super) inner: super::Inner, // Inner::Traditional { state: Mutex<InnerState>, ... }
}
// InnerState = { next_wake: Option<NonZeroU64>, wheel: wheel::Wheel }
导航链路的三个关键跃迁点:
scheduler::Handle::driver()— 从运行时全貌进入 driver 聚合层driver::Handle::time()— 从 driver 聚合中提取 Time Driver 句柄(若未 enable_time 则 panic)time::Handle持有Mutex<InnerState{ wheel }>— 从这里开始process()→Wheel::poll()→fire()→StateCell
简单来说:用户持有的 Sleep 是一个 Future,它的内部通过 TimerEntry 管理一个 deadline,而 TimerShared 以侵入式链表节点的形式嵌入到底层的哈希时间轮(Hashed Timing Wheel)中。
二、首次 poll:注册到时间轮
当 Sleep 第一次被 poll 时,TimerEntry 还没有注册到时间轮。这时它会调用 reset,将自己插入到底层的 Wheel 数据结构中6:
// tokio/src/runtime/time/entry.rs: 598-617
pub(crate) fn poll_elapsed(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), super::Error>> {
if !self.registered {
let deadline = self.deadline;
// 首次 poll:将 deadline 注册到时间轮
self.as_mut().reset(deadline, true);
}
let inner = self.inner()
.expect("inner should already be initialized by `self.reset()`");
// 通过 StateCell 检查是否已经到期
inner.state.poll(cx.waker())
}
reset 的核心逻辑是:
- 把
Instant格式的 deadline 通过TimeSource转换成u64类型的 tick(毫秒级) - 先尝试原子操作
extend_expiration——如果新 deadline 比旧的晚,可以无锁更新 - 如果
extend_expiration失败(比如 deadline 提前了,或者 timer 已经在触发队列中),则走reregister路径重新插入 Wheel
注意这里的类型转换和调用链。reset() 的源码中7:
// tokio/src/runtime/time/entry.rs: 638-649
let inner = match self.inner() {
Some(inner) => inner, // inner: &TimerShared
None => { /* 初始化 */ }
};
// extend_expiration 失败时走 reregister 路径
if reregister {
unsafe {
self.driver()
.reregister(&self.driver.driver().io, tick, inner.into());
}
}
inner 是 &TimerShared,.into() 通过标准库的 From<&T> for NonNull<T> 将其转换为 NonNull<TimerShared>——一个原始指针包装。这个 NonNull 随后被传入 Handle::reregister()8:
// tokio/src/runtime/time/mod.rs: 398-435
pub(self) unsafe fn reregister(
&self, unpark: &IoHandle, new_tick: u64,
entry: NonNull<TimerShared>, // ← 来自 inner.into()
) {
let mut lock = self.inner.lock();
// 如果 timer 还在 Wheel 中,先移除
if unsafe { entry.as_ref().might_be_registered() } {
lock.wheel.remove(entry);
}
// 创建 TimerHandle——一个 NonNull<TimerShared> 的 "唯一指针"
let entry = entry.as_ref().handle();
entry.set_expiration(new_tick); // 设置新的到期时间
// 插入 Wheel
match unsafe { lock.wheel.insert(entry) } {
Ok(when) => {
// 如果新到期时间比当前最早更早 → unpark 线程
if lock.next_wake.map(|n| when < n.get()).unwrap_or(true) {
unpark.unpark();
}
}
Err((entry, InsertError::Elapsed)) => unsafe { entry.fire(Ok(())) },
}
}
这里 entry.as_ref().handle() 创建 TimerHandle——它的定义极其简单9:
// tokio/src/runtime/time/entry.rs: 183-188
pub(crate) struct TimerHandle {
inner: NonNull<TimerShared>,
}
// tokio/src/runtime/time/entry.rs: 308-312
impl TimerShared {
pub(super) fn handle(&self) -> TimerHandle {
TimerHandle { inner: NonNull::from(self) }
}
}
TimerHandle 不拥有 TimerShared——它只是一个”唯一指针”,借用规则由 unsafe 契约保证(持有驱动锁时才能操作)。
随后 Wheel::insert() 根据到期时间计算层级和 slot 位置,调用 Level::add_entry() 将 TimerHandle(即 NonNull<TimerShared>)push 到 slot 的侵入式链表上10:
// tokio/src/runtime/time/wheel/mod.rs: 88-112
pub(crate) unsafe fn insert(
&mut self, item: TimerHandle,
) -> Result<u64, (TimerHandle, InsertError)> {
let when = unsafe { item.sync_when() }; // 同步到期时间
if when <= self.elapsed {
return Err((item, InsertError::Elapsed)); // 已过期
}
let level = self.level_for(when); // 计算层级
unsafe { self.levels[level].add_entry(item); }
Ok(when)
}
// tokio/src/runtime/time/wheel/level.rs: 122-128
pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) {
let slot = slot_for(unsafe { item.registered_when() }, self.level);
self.slot[slot].push_front(item); // push 到侵入式链表
self.occupied |= occupied_bit(slot); // 标记 slot 非空
}
self.slot[slot] 的类型是 EntryList,即 LinkedList<TimerShared, TimerShared>——一个侵入式双向链表。push_front 将 TimerHandle 中的 NonNull<TimerShared> 作为链表节点插入,TimerShared 的 pointers 字段就是它的 prev/next 指针。
这里有一个关键设计——两层到期时间缓存。TimerShared 中有两个时间字段:
// tokio/src/runtime/time/entry.rs: 269-281
pub(crate) struct TimerShared {
registered_when: AtomicU64, // ① 写入 slot 时的到期时间缓存
state: StateCell, // ② StateCell 内部的 AtomicU64 存 true expiration
// ...
}
registered_when:timer 插入 slot 时的到期时间。Wheel 用它计算 slot 索引(slot_for()),在remove()时定位 slot。这个值只在插入和移动时更新——它可能过期。StateCell.state:真正的到期时间。它由TimerEntry::reset()写入,可以通过extend_expiration()在无锁的情况下延后(CAS 更新),Driver 在收割时用它做最终判断。
sync_when() 是两层之间的桥梁——把 StateCell.state(true_when)同步到 registered_when,供 Wheel 做 slot 定位:
// Wheel::insert() 先调用 sync_when()
let when = unsafe { item.sync_when() }; // StateCell.state → registered_when
if when <= self.elapsed { return Err(Elapsed); }
let level = self.level_for(when); // 用 sync 后的值算层级
self.levels[level].add_entry(item); // add_entry 内部用 registered_when 算 slot
收割时,process_expiration() 用 mark_pending(expiration.deadline) 直接读 StateCell.state 做最终比对:
// process_expiration 简化
match unsafe { item.mark_pending(expiration.deadline) } {
Ok(()) => pending.push_front(item), // StateCell.state ≤ deadline → 到期
Err(expiration_tick) => { // StateCell.state > deadline → 还没到
// 把 true_when 写回 registered_when,重插入正确 slot
self.levels[level].add_entry(item);
}
}
这种两层设计是为 extend_expiration 乐观路径服务的:用户通过 CAS 无锁延后了 StateCell.state,但 registered_when 还指向旧的 slot。Driver 收割时发现 state > slot deadline,不触发 fire,而是把 state 同步回 registered_when 并重插入正确 slot——整个过程零锁争用。
所以整条注册链是:
Sleep::poll_elapsed()
→ TimerEntry::reset()
→ extend_expiration(tick) ← 乐观路径:CAS 无锁延后
→ Handle::reregister() ← 失败时走此路径
→ lock.wheel.insert(handle) ← TimerHandle 入 Wheel
→ Level::add_entry()
→ slot[slot_index].push_front(item)
→ occupied |= (1 << slot)
现在可以更精确地描述数据流:Sleep 持有 TimerEntry,TimerEntry 持有 Pin<Option<TimerShared>>,TimerShared 通过 NonNull<TimerShared>(即 TimerHandle)以侵入式链表节点形式直接挂在 Level.slot[slot] 上。没有复制、没有中间分配——移动 TimerHandle 就是在操作 TimerShared 的 pointers 指针。
这个过程可以用下面的序列图看清:
sequenceDiagram
participant Sleep as Sleep Future
participant Entry as TimerEntry
participant Shared as TimerShared
participant Handle as TimerHandle
participant Wheel as Wheel
participant Level as Level.slot[64]
Sleep->>Entry: poll_elapsed(cx)
Entry->>Entry: reset(deadline)
Entry->>Shared: extend_expiration(tick)
Note over Shared: 如果新 deadline<br/>比旧的晚→CAS 成功<br/>跳过重新注册
alt extend_expiration 失败
Entry->>Shared: inner.into() → TimerHandle
Entry->>Wheel: Handle.reregister(tick)
Wheel->>Wheel: remove(old slot)
Wheel->>Wheel: insert(handle)
Wheel->>Level: level_for(when) 计算层级
Level->>Level: slot[slot_index].push_front(item)
level-->>Wheel: OK(when)
Wheel-->>Entry: 注册完成
end
Entry->>Shared: state.poll(waker)
Shared-->>Entry: Poll::Pending
Entry-->>Sleep: Poll::Pending
这里有一个关键细节:首次注册不是在 sleep() 调用时发生的,而是在第一次 poll 时发生的。这也是为什么如果在 runtime 外部直接调用 sleep() 会 panic——它需要拿到当前的调度器句柄,而句柄只存在于 runtime 上下文中。如果把它包在一个 async block 里,调用就是惰性的,等到真正 poll 时已经在 runtime 内部了,就不会 panic11。
具体来说,scheduler::Handle(内含 time::Handle)的注入分两步:
① sleep(duration) 被调用时 ── Handle 存入 Sleep,但 TimerEntry 还未创建
② 首次 .await 触发 poll 时 ── Handle 从 Sleep 传递给 TimerEntry::new()
// ① tokio/src/time/sleep.rs: Sleep::new_timeout —— sleep() 调用时
impl Sleep {
fn new_timeout(deadline: Instant, ..) -> Self {
Self {
driver: Handle::current(), // ← 从 thread-local 取出 scheduler::Handle
timer: None, // ← 这里还是 None,TimerEntry 懒创建
deadline,
..
}
}
}
// ② tokio/src/time/sleep.rs: Sleep::poll_elapsed —— 首次 poll 时
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut Context<'_>) -> .. {
let timer = match this.timer.as_mut().as_pin_mut() {
Some(timer) => timer,
None => {
let handle = this.driver; // ← 步骤①存入的 scheduler::Handle
let timer = Timer::new(handle.clone(), *this.deadline);
// ↑ Timer::new() → TimerEntry::new() → TimerEntry { driver: scheduler::Handle }
this.timer.set(Some(timer));
..
}
};
timer.poll_elapsed(cx)
}
Timer::new()内部调用TimerEntry::new(),后者将scheduler::Handle存入TimerEntry.driver字段。这个 Handle 贯穿 timer 的整个生命周期——poll、reset、reregister都用它来导航到time::Handle或io::Handle。
由此可以画出两条独立的调用路径——它们不直接调用对方,而是在 StateCell 处交汇:
═══════════ Task 侧(poll 路径)═══════════ ═══════════ Driver 侧(park 路径)═══════════
park_internal(rt_handle: &driver::Handle)
Sleep::poll(cx) │
│ ├─ let handle = rt_handle.time()
└─ poll_elapsed(cx) │ // → &time::Handle
│ │
└─ TimerEntry::poll_elapsed(cx) ├─ handle.process(clock)
│ │ │
└─ self.inner.state │ └─ process_at_time(now)
.poll(cx.waker()) │ │
│ │ └─ lock.wheel.poll(now)
│ StateCell::poll(): │ │
├─ store waker │ └─ entry.fire(Ok(()))
└─ load state │ │
└─ Poll::Pending │ └─ StateCell::fire()
↑ ├─ store DEREGISTERED
交汇点 ──────────────┘ └─ take waker
│
waker.wake()
Task 侧写入 Waker(poll),Driver 侧取出 Waker(fire)。两者操作的是同一个 TimerShared 内的同一个 StateCell,但 Task 侧不需要 time::Handle——它直接操作自己持有的 TimerShared.state;Driver 侧也不需要知道 TimerEntry 的存在——它通过 TimerHandle(一个 NonNull<TimerShared> 裸指针)间接操作。
三、哈希时间轮(Hashed Timing Wheel)
Timer 的核心数据结构是一个多层哈希时间轮。Tokio 的注释明确引用了 Varghese 和 Lauck 的经典论文12,该论文也因 Linux 内核的高精度定时器实现而广为人知。
时间轮的核心思想很简单:把到期的定时器散列到不同的 slot 里,时钟走动时只检查当前 slot,避免每次都扫描全部定时器。
它本质上是用空间换时间:用 6 层 × 64 slot 的数组,换取每次 tick 只扫一个 slot 的能力。对比一下就能明白——如果用一个普通 Vec 或链表存储所有定时器,每次 tick 都得全量扫描一遍,O(n);而时间轮把定时器按到期时间散列,每次 tick 只检查当前 slot,O(1)。存储只是手段,快速检索到期定时器才是时间轮存在的目的。
Tokio 使用了 6 层时间轮,每层 64 个 slot:
Level 0: 64 × 1 ms = 64 ms 覆盖范围
Level 1: 64 × 64 ms = ~4 秒覆盖范围
Level 2: 64 × ~4 s = ~4 分钟覆盖范围
Level 3: 64 × ~4 min = ~4 小时覆盖范围
Level 4: 64 × ~4 hr = ~12 天覆盖范围
Level 5: 64 × ~12 day = ~2 年覆盖范围
下图展示了一个简化的 3 层时间轮的工作方式:
graph LR
subgraph "Level 2 (~4 min)"
L2["slot 0-63<br/>每 slot ~4s"]
end
subgraph "Level 1 (~4 sec)"
L1["slot 0-63<br/>每 slot 64ms"]
end
subgraph "Level 0 (64 ms)"
L0["slot 0-63<br/>每 slot 1ms"]
end
L2 -->|"降级:slot 到期<br/>重新分配"| L1
L1 -->|"降级"| L0
L0 -->|"到期触发"| F["fire → wake"]
定时器首先被插入到与它的到期时间匹配的层级。比如一个 10 秒后到期的定时器,会落在 Level 2 的某个 slot 里。当该 slot 到期时,里面的所有定时器会被重新分配到 Level 1;然后再降到 Level 0;最终在 Level 0 的 slot 到期时真正触发。
这种层级设计的优势在于:
- 插入 O(1):只需要根据到期时间算出 slot 位置
- 每 tick 的检查量很小:只处理当前 slot 内的定时器
- 覆盖范围大:6 层 64 slot 的结构可以覆盖约 2 年的时间范围,精度 1 毫秒
同一 Level 内的 64 个 slot 共享同一个精度(slot_range),但覆盖不同的时间窗口。例如 Level 2 的 slot[0] 覆盖 0~4s,slot[1] 覆盖 4~8s,以此类推——精度始终是 ~4s,只是窗口在平移。
Wheel 结构体的关键字段13:
// tokio/src/runtime/time/wheel/mod.rs: 22-39
pub(crate) struct Wheel {
// 时间轮启动以来经过的毫秒数
elapsed: u64,
// 6 层,每层 64 个 slot
levels: Box<[Level; NUM_LEVELS]>,
// 等待触发的定时器队列
pending: EntryList,
}
每个 Level 包含一个 [EntryList; 64] 数组,而 EntryList 就是 LinkedList<TimerShared>14:
// tokio/src/runtime/time/wheel/level.rs: 6-15
pub(crate) struct Level {
level: usize,
occupied: u64, // 位图标记哪些 slot 非空
slot: [EntryList; 64], // 64 个 slot,每个存放一个侵入式链表
}
// tokio/src/runtime/time/entry.rs: 195
pub(super) type EntryList = LinkedList<TimerShared, TimerShared>;
occupied 是一个 u64 位图——第 i 位为 1 表示 slot[i] 非空。next_expiration() 用 trailing_zeros() 直接在一条指令中找到最低位的 1,避免遍历 64 个 slot:
// tokio/src/runtime/time/wheel/level.rs: 60-78
pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
// occupied >> now_slot: 跳过当前 slot 之前的位
let mask = self.occupied.rotate_right(now_slot as u32) >> 1;
let next = mask.trailing_zeros(); // ← 一条指令找下一个非空 slot
// ↑ 如果 mask = 0(无更多非空 slot),返回 64
// 调用者据此判断本层无到期
if next >= 64 { return None; }
let slot = (now_slot + next as usize + 1) % 64;
let level_range = 64_usize.pow(self.level as u32) * 64;
let slot_range = 64_usize.pow(self.level as u32);
let level_start = (now / level_range as u64) * level_range as u64;
let deadline = level_start + (slot as u64) * slot_range as u64;
Some(Expiration { level: self.level, slot, deadline })
}
所以 Wheel::poll() 的 loop 内部相当于:
for each level [0..5]:
if level.occupied != 0: // 位图非空
mask = (occupied >> now_slot) >> 1
next = mask.trailing_zeros() // 硬件指令,~1 个周期
if next < 64: return (level, slot, deadline)
// 否则本层无到期 → 继续查下一层
return None // 所有层级都空
最坏情况(没有任何 timer):6 次 occupied != 0 检查 + 6 次 trailing_zeros,总开销约几十个 CPU 周期。无空轮询、无循环遍历 64 slot——这是位图优化在时间轮中的经典用法。
每层只检查 1 个 slot——trailing_zeros() 找到最低位的 1 后直接返回,每层最多输出一个 Expiration。所以 next_expiration() 扫描 6 层最多做 6 次 trailing_zeros、返回 1 个 (level, slot, deadline),不继续找第二个 slot。
这里涉及三个 next_expiration 方法,分散在两个 struct 中,各管一级:
Wheel::next_expiration_time() ← mod.rs:194,pub(super)
└→ Wheel::next_expiration() ← mod.rs:168,private
└→ for each level [0..5]:
Level::next_expiration(elapsed) ← level.rs:51,pub(crate)
→ occupied.trailing_zeros()
→ 返回该层最早的 Expiration { level, slot, deadline }
Level::next_expiration(now):查一个层的occupied位图,返回该层最早非空 slot 的Expiration。now参数用于跳过当前已处理过的 slot。Wheel::next_expiration():遍历 6 层,每层调Level::next_expiration(elapsed),返回全局最早的Expiration。for (level_num, level) in self.levels.iter().enumerate()从levels[0](Level 0,1ms/slot,最精细)往levels[5](Level 5,~12天/slot,最粗)扫——找到立刻返回,不继续查更高层。Wheel::next_expiration_time():调next_expiration().map(|ex| ex.deadline),只取deadline(u64tick)暴露给park_internal()做 timeout 计算。
收割与降级:process_expiration 的决策
Wheel::poll(now) 找到 expiration.deadline ≤ now 的 slot 后,调用 process_expiration(expiration) 处理该 slot 中所有 entry。对每个 entry,处理逻辑不区分层级——完全由 mark_pending() 的返回值决定:
process_expiration(在 Wheel::poll 内,持 driver 锁)
│
mark_pending(deadline)
│
┌─────┴─────┐
│ │
state ≤ deadline state > deadline
│ │
Ok(()) Err(tick)
│ │
pending level_for() + add_entry()
入队 (降级,仍留在轮子上)
─── 同一轮 Wheel::poll 的下一次迭代,或 process_at_time 的 while 循环 ───
pending.pop() → Handle::process_at_time 里 entry.fire(Ok(()))
→ WakeList → 释放锁后 waker.wake()
关键的认识:
- 到期与触发是两步,不在同一函数里完成:
process_expiration只做mark_pending+ 入pending队列(或降级);真正的fire()在Handle::process_at_time里,由wheel.poll(now)从pending弹出 handle 后调用。不要把「进 pending」和「调用 fire」画成同一步。 - 不是 Level 0 才 fire,Level > 0 才降级——判据是
mark_pending的返回值,不是level == 0。Level 0 只是 slot 宽度仅 1ms,所以绝大多数 entry 在 Level 0 收割时会进入pending。但用户通过extend_expiration把state延后到当前 slot 的deadline之外时,即使在 Level 0 收割,也会Err并降级。 - 降级的目标层级不一定是当前层级减 1——
level_for()用deadline和expiration_tick重新计算,可能跳过多层直接回到 Level 3(如果 timer 还剩好几分钟)。降级是 O(1) 的:一次level_for计算 + 一次add_entry,无需重排链表。 - 收割一个 slot 时,slot 内的 entry 链表被整组取出(
take_entries)后逐个处理,每个 entry 独立决策入pending还是降级。同一个 slot 里可能有部分到期、部分被延后的 entry——tokio 的这个设计正确处理了两者混存的情况。
所以整个 Level 层级的职责是统一的:每层只负责按自己的 slot 粒度找到最早的到期 slot,把整组 entry 取出来;mark_pending 裁決入队还是降级,fire 则由上层的 process_at_time 统一处理。 没有 per-level 特化代码,6 个 Level 实例(self.levels: Box<[Level; 6]>)共享同一份算法。
用一句话区分三个关键方法:next_expiration_time() 扫描 6 层找最早 deadline(算睡多久);process_expiration() 逐 entry 裁決入 pending 还是降级;process_at_time() 从 pending 取出并 fire、批量唤醒任务。
next_wake 的值是所有非空层级中最早的 deadline,不一定是 Level 0 的 slot:
Level 0: empty (occupied = 0)
Level 1: empty
Level 2: slot[9] → deadline = elapsed + 4096×9 = elapsed + 36,864ms
Level 3: slot[1] → deadline = elapsed + 262144×1 = elapsed + 262,144ms
next_expiration(): 从 Level 0 开始逐层检查
→ Level 0: occupied=0 → 跳过
→ Level 1: occupied=0 → 跳过
→ Level 2: slot[9] occupied → 返回 deadline ≈ 当前 + 36s
→ 不再检查 Level 3+(因为已有更早的结果)
next_wake = 36,864 ← 来自 Level 2 而非 Level 0
next_expiration() 从 Level 0 到 Level 5 逐层扫描,只要低层有非空 slot 就立即返回——不再继续查更高层。所以 next_wake 永远是最小粒度的那个到期 slot,可能是 Level 2 的 slot 9(≈36s 后),也可能是 Level 0 的 slot 3(≈3ms 后),取决于当前时间轮中的 timer 分布。
所以 TimerShared 和 Wheel 的关系是:
graph TD
subgraph Wheel
W["Wheel"]
subgraph Levels
L0["Level 0: [slot0, slot1, ..., slot63]"]
L1["Level 1: [slot0, slot1, ..., slot63]"]
L5["... Level 5"]
end
Pend["pending: EntryList"]
end
subgraph Timer
TS1["TimerShared A<br/>(state + pointers)"]
TS2["TimerShared B<br/>(state + pointers)"]
end
L0 -->|"slot[5]"| TS1
L0 -->|"slot[5]"| TS2
TS1 -->|"pointers.prev"| TS2
TS2 -->|"pointers.next"| TS1
subgraph Sleep
S["Sleep 1"] --> TE1["TimerEntry 1"]
TE1 --> TS1
S2["Sleep 2"] --> TE2["TimerEntry 2"]
TE2 --> TS2
end
这里的关键是 TimerShared 通过侵入式链表的 pointers 字段直接挂在 Level.slot[slot_index] 上,Wheel 不复制也不拥有 TimerShared 的所有权——它只通过指针链接。这就是”侵入式”的含义:被链接的对象自身包含链接指针,而非由容器分配独立的节点。这也意味着 TimerShared 必须被 pin 住不能移动,因为链表指针指向的是它自身的内存地址。
3.2 一个完整的例子:5 分钟 sleep 的降级之旅
关于时间参考点的说明:
Wheel.elapsed、StateCell.state和registered_when都是相对于同一个TimeSource.start_time的 绝对毫秒偏移量。下面的例子设start_time = sleep 创建的时刻,此时elapsed在第一次process_at_time前尚未推进,设为例值0。实际运行中elapsed一般是个小的非零值(runtime 启动后到首次 park 之间经过的毫秒数),但这一点偏移不影响层级计算逻辑,因为state - elapsed的差值不受影响。
把前面的概念串起来,假设你在 elapsed=0(即 runtime 刚启动时)创建了一个 tokio::time::sleep(Duration::from_secs(300)).await,看看它在 Wheel 中如何从高层降级到低层,最终被 fire。
t=0:创建和注册
StateCell.state = STATE_DEREGISTERED // 初始值 u64::MAX
Wheel.elapsed = 0 // Wheel 刚开始
首次 poll 时:TimerEntry::reset(deadline) 被调用。TimeSource::deadline_to_tick() 将 5 分钟(300 秒)转为毫秒 tick:
tick = 300,000 // 5 min = 300,000 ms
extend_expiration(tick=300000)
→ 失败(初始状态是 STATE_DEREGISTERED,不是有效的到期时间)
→ 走 reregister 路径
→ set_expiration(300000)
StateCell.state = 300,000 // ← true expiration
→ Wheel::insert()
→ sync_when()
registered_when = 300,000
StateCell.state = 300,000 (不变)
→ level_for(0, 300000) = 3
// 计算过程:masked = 0 ^ 300000 | 63 = 300063
// leading_zeros = 45, significant = 18, 18/6 = 3
→ Level 3, slot 1
// slot_for(300000, 3) = (300000 >> 18) % 64 = 1
此时 Wheel 状态:
Wheel.elapsed = 0
Level 3: slot[1] → TimerShared (registered_when=300000, state=300000)
t=0 ~ 4分钟:线程 park,等最早 deadline
park_internal() 查 next_expiration_time(),发现 Level 3 slot 1 的 deadline 在约 4.37 分钟后(1 × 64^3 = 262,144ms),于是 park_timeout(262s)。
t=262s:Level 3 slot 1 到期,降级到 Level 2
Driver 醒来,process_at_time(262144) → Wheel::poll(262144)。Wheel::poll() 的内部循环13先检查 next_expiration() 找到 Level 3 slot 1,发现其 deadline=262144 ≤ now,于是调用 self.process_expiration(expiration):
// tokio/src/runtime/time/wheel/mod.rs: 126-133
pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> {
loop {
if let Some(handle) = self.pending.pop_back() { return Some(handle); }
match self.next_expiration() {
Some(ref expiration) if expiration.deadline <= now => {
self.process_expiration(expiration); // ← 收割降级
self.set_elapsed(expiration.deadline);
}
_ => { self.set_elapsed(now); break; }
}
}
self.pending.pop_back()
}
process_expiration 先取走 slot 中所有 entry(take_entries),然后逐个调用 mark_pending(deadline)13。对每个 entry,mark_pending 用 StateCell.state 和 deadline 做对比:
// tokio/src/runtime/time/wheel/mod.rs: 171-196
pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
let mut entries = self.take_entries(expiration); // 取出 slot 链表
while let Some(item) = entries.pop_back() {
match unsafe { item.mark_pending(expiration.deadline) } {
Ok(()) => self.pending.push_front(item), // 到期
Err(expiration_tick) => { // 还没到
let level = level_for(expiration.deadline, expiration_tick);
unsafe { self.levels[level].add_entry(item); } // 降级
}
}
}
}
item 是 TimerHandle(即 NonNull<TimerShared>)。mark_pending(262144) 内部:
// tokio/src/runtime/time/entry.rs: 171-199 (StateCell::mark_pending)
let mut cur_state = self.state.load(Ordering::Relaxed);
// cur_state = 300,000(5分钟到期)
// not_after = 262,144(Level 3 slot 1 deadline)
if cur_state > not_after { // 300,000 > 262,144 → true
break Err(cur_state); // 还没到点,返回 true expiration
}
返回 Err(300000) 后,TimerHandle::mark_pending 把 true_when 写回 registered_when:
// tokio/src/runtime/time/entry.rs: 250-264 (TimerHandle::mark_pending)
Err(tick) => {
unsafe { self.inner.as_ref().set_registered_when(tick); }
Err(tick)
}
回到 process_expiration,用 level_for(262144, 300000) 重新计算层级:
masked = 262144 ^ 300000 | 63 = 37887
leading_zeros = 48, significant = 15, 15/6 = 2
→ Level 2, slot 9
slot_for(300000, 2) = (300000 >> 12) % 64 = 9
→ add_entry: Level 2, slot[9].push_front(item)
→ timer 已从 Level 3 slot 1 移至 Level 2 slot 9
Wheel.elapsed = 262,144
Level 2: slot[9] → TimerShared (registered_when=300000, state=300000)
Level 3: slot[1] → (空)
t=262s ~ 298s:继续 park
next_expiration_time() 找到 Level 2 slot 9 的 deadline:
slot_range(2) = 64^2 = 4,096 ms
slot 9 × 4,096 ms = 36,864 ms
Level 2 slot 9 deadline ≈ 262,144 + 36,864 = 298,...
线程 park_timeout(~36s)。
t=298s:Level 2 slot 9 到期,降级到 Level 1
process_expiration(Level 2, slot 9, deadline=298xxx):
→ mark_pending(298xxx)
StateCell.state = 300,000 > 298,xxx → 还没到点
→ Err(300000)
→ level_for(298xxx, 300000) = 1
→ Level 1, slot 对应 300,000
Wheel.elapsed = 298,...
Level 1: slot[...] → TimerShared
Level 2: slot[9] → (空)
t=298s ~ 300s:Level 1 到 Level 0,最终 fire
类似逻辑继续降级到 Level 0。Level 0 slot 宽度是 1ms:
当 elapsed >= 300,000 时,Level 0 到期。process_expiration 再次被调用,但这次是 Level 0,mark_pending 中的判断不同了:
// tokio/src/runtime/time/entry.rs: 171-199 (StateCell::mark_pending)
let mut cur_state = self.state.load(Ordering::Relaxed);
// cur_state = 300,000
// not_after = 300,000(Level 0 slot deadline)
if cur_state > not_after { // 300,000 > 300,000 → false
break Err(cur_state);
}
// cur_state ≤ not_after → 到期!
compare_exchange(cur_state, STATE_PENDING_FIRE, AcqRel, Acquire)
CAS 成功将状态从 300000 置为 STATE_PENDING_FIRE,返回 Ok(()),entry 被推入 self.pending。随后 Wheel::poll() 的 loop 下一次迭代从 pending 弹出这个 entry,process_at_time 拿到后调用 fire():
// tokio/src/runtime/time/entry.rs: 211-231 (StateCell::fire)
let cur_state = self.state.load(Ordering::Relaxed);
if cur_state == STATE_DEREGISTERED { return None; } // 不为 u64::MAX
// 写入结果
unsafe { self.result.with_mut(|p| *p = Ok(())) };
// state → STATE_DEREGISTERED(u64::MAX)
self.state.store(STATE_DEREGISTERED, Ordering::Release);
// 取出 waker 返回
self.waker.take_waker()
fire 返回 Some(waker),process_at_time 将其压入 WakeList,释放锁后批量 wake_all()——任务被放回调度队列,下次 Sleep::poll() 返回 Poll::Ready(())。
如果用 reset() 延长 deadline 呢?
假设在第 1 分钟时,用户调用了 sleep.reset(new_deadline=10min):
TimerEntry::reset(10min)
→ tick = 600,000
→ extend_expiration(600000)
StateCell.state 从 300,000 CAS 到 600,000 ✓
→ 成功,return(跳过 reregister!)
此时 timer 还在 Level 3 slot 1 里,registered_when 仍然是 300,000。没有任何链表操作——这就是”乐观延后”的零锁优势。
当 Driver 在 elapsed=262144 时收割 Level 3 slot 1:
mark_pending(262144):
StateCell.state = 600,000 > 262,144
→ Err(600000)
→ 把 true_when(600000) 同步回 registered_when
→ 按 600,000 重新计算层级
level_for(262144, 600000) = 更高的层级...
→ 插入到正确的 slot
整个过程零锁争用——extend_expiration 是 CAS,mark_pending 也是 CAS,没有 Mutex。
3.3 elapsed 是如何维护的?
一个关键的设计细节是:elapsed 不依赖任何外部定时器来推进。它是 Driver 在每次 process_at_time(now) 时,用 now 和 elapsed 的差值来”推”的。
每次 Driver 醒来(无论是被 epoll_wait 超时叫醒,还是被 I/O 事件提前 unpark),都会调用 handle.process(clock),内部是 process_at_time(now)。Wheel 收到 now(u64 tick,来自 TimeSource),和当前的 elapsed 比较差值,把落在差值内的 slot 逐个处理——收割到期 timer、降级上层 timer。
Driver 被 I/O 事件提前叫醒(本应在 5 秒后才醒):
→ process_at_time(now=1003) // 只走了 3ms
→ Wheel: 从 elapsed=1000 到 now=1003
第 1000 slot 收割 → 无到期 timer
第 1001-1002 slot 无内容
elapsed = 1003
→ 所有 timer 都还在,下次 park_timeout 重新计算
Driver 在 deadline 时被叫醒:
→ process_at_time(now=6000)
→ Wheel: 从 elapsed=1000 到 now=6000
第 1000、1001、…、5000 slot 逐个处理
→ elapsed=5000 时有到期 timer → fire → wake
→ 继续推进到 6000
elapsed = 6000
→ 重新计算 next_wake
所以时间轮是完全自洽的:它不和”墙上时间”绑定,只和 elapsed 这个单调递增的计数器绑定。TimeSource 的 clock.now() 只用来算增量——增量多大,elapsed 就推多远。即使系统被暂停(suspend-to-RAM),恢复后 clock.now() 返回更新后的时间,增量包含暂停时长,elapsed 一次推到位,所有”超期的 timer”被立即收割。
tokio 不依赖任何外部定时器来推进时间轮。外部定时器(epoll_wait 的 timeout)的唯一作用是”到点叫醒线程”——醒来后 tokio 用自己的 TimeSource 计算增量、推进 elapsed、收割到期 timer。这就是为什么 park_timeout(0) 也能工作:即使不进入内核定时器,process_at_time 也能用 now = TimeSource::now() 推进 Wheel。
四、StateCell:原子状态机
TimerShared 中最关键的部分是 StateCell——一个用原子变量实现的状态机,负责协调用户端(TimerEntry)和驱动端(Driver)之间的并发访问15:
// tokio/src/runtime/time/entry.rs: 91-102
pub(super) struct StateCell {
// 保存到期时间,或特殊标志值 u64::MAX(已注销)
state: AtomicU64,
// 定时器触发后的结果
result: UnsafeCell<TimerResult>,
// 已注册的 Waker
waker: AtomicWaker,
}
state 字段用单个 AtomicU64 同时承载”到期时间”和”状态标记”——它是一个 tagged union,利用值域范围区分含义:
stateDiagram-v2
[*] --> Deregistered
Deregistered --> Scheduled: set_expiration(tick)
Scheduled --> PendingFire: mark_pending() 成功
PendingFire --> Deregistered: fire(result)
Scheduled --> Deregistered: 直接 fire (注)
Deregistered --> Scheduled: 重新注册 (set_expiration)
(注)包括
clear_entry(cancel/drop)→fire、reregister检测到InsertError::Elapsed时直接 fire、runtime shutdown 时fire(Err(…))等路径。所有路径最终都通过fire()进入Deregistered——StateCell层面不存在独立的 “Cancelled” 状态,取消和正常到期共享同一个终端状态。
三个离散状态的编码定义在 entry.rs 顶部(L72-78):
| 状态 | state 的 u64 取值 |
含义 |
|---|---|---|
| Scheduled(已调度) | [0, MAX_SAFE_MILLIS_DURATION],即 < STATE_MIN_VALUE 的任意值 |
定时器在时间轮中等待;state 存储的就是具体的到期 tick |
| PendingFire(待触发) | STATE_PENDING_FIRE = u64::MAX - 1 |
已被驱动从时间轮取出,放入 pending 链表,即将触发 |
| Deregistered(已触发/已注销) | STATE_DEREGISTERED = u64::MAX |
已触发或已注销;结果已写入 result 字段,poll() 返回 Ready(result) |
其中 Scheduled 是”胖状态”——state 可以是 [0, MAX_SAFE_MILLIS_DURATION] 之间的任意值,编码了具体的到期时间。而 PendingFire 和 Deregistered 是纯标记,不携带额外信息。这种设计的精妙之处在于:
- 原子性——
compare_exchange_weak可以一次原子操作、”检查到期时间 + 标记状态转换”。例如mark_pending()中的 CAS 同时验证state ≤ not_after并将值替换为STATE_PENDING_FIRE。 - 无锁乐观更新——
extend_expiration()用 CAS 尝试把到期时间往后推;如果state已被驱动改成STATE_DEREGISTERED,CAS 失败,调用者就知道需要走完整的重新注册路径,无需先取锁。 - 隐式 Release-Acquire 同步——
fire()先写result再以Release顺序写state = STATE_DEREGISTERED;read_state()以Acquire顺序读state,保证读到STATE_DEREGISTERED时result一定已写入完毕,零额外栅栏。
当前状态标记只占用了 u64 顶部 2 个值:
| 取值 | 状态 |
|---|---|
u64::MAX - 1 = 0xFFFF_FFFF_FFFF_FFFE |
STATE_PENDING_FIRE |
u64::MAX = 0xFFFF_FFFF_FFFF_FFFF |
STATE_DEREGISTERED |
但预留边界的定义很灵活:STATE_MIN_VALUE 被定义为 STATE_PENDING_FIRE,加上注释 // This value should be updated if any other signal values are added above. 表明,只需把 STATE_MIN_VALUE 减 1,就能让 u64::MAX - 2 成为第 3 个可用信号值。整个 u64 值域中大于 MAX_SAFE_MILLIS_DURATION(0xFFFF_FFFF_FFFF_FFFD)的部分都是预留空间,目前只占用了顶部 2 个,扩展空间充裕。
事实上 MAX_SAFE_MILLIS_DURATION ≈ 18.4 × 10¹⁸ 个 tick,按 tokio 默认的毫秒粒度约 5.8 亿年,时间表达范围远超实际需求,划顶部几个值给状态标记几乎不影响可用 tick 空间。
驱动端的两段式处理:
PendingFire由process_expiration里的mark_pending通过 CAS 写入;Deregistered由park_internal醒来后的process_at_time→wheel.poll→fire写入。下文分述三个原子操作,注意mark_pending与fire不在同一调用栈。
几个关键操作的实现:
poll——用户端检查是否到期15:
// tokio/src/runtime/time/entry.rs: 142-162
fn poll(&self, waker: &Waker) -> Poll<TimerResult> {
// 先注册 waker,确保 fire 和 poll 之间的竞争不会丢失唤醒
self.waker.register_by_ref(waker);
// 读取状态
self.read_state()
}
fn read_state(&self) -> Poll<TimerResult> {
let cur_state = self.state.load(Ordering::Acquire);
if cur_state == STATE_DEREGISTERED {
// 已经触发,读取 result
Poll::Ready(unsafe { self.result.with(|p| *p) })
} else {
Poll::Pending
}
}
注意 poll 里先注册 waker 再读状态的顺序——如果反过来,可能会发生在读状态之后、注册 waker 之前这个窗口期内定时器恰好触发并调用 fire,从而导致任务永久丢失唤醒信号。
mark_pending——驱动端将定时器移入待触发队列15:
// tokio/src/runtime/time/entry.rs: 171-199
unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
let mut cur_state = self.state.load(Ordering::Relaxed);
loop {
if cur_state > not_after {
// 实际到期时间比当前 tick 晚——不应该触发
break Err(cur_state);
}
match self.state.compare_exchange_weak(
cur_state,
STATE_PENDING_FIRE,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break Ok(()),
Err(actual_state) => cur_state = actual_state,
}
}
}
这里使用 CAS(Compare-And-Swap)来原子地将定时器从”已调度”状态转换到”待触发”状态。CAS 失败意味着有人并发修改了状态(比如用户端重置了定时器),需要重试。
TimerHandle::mark_pending 在成功时还会把 registered_when 标成 STATE_DEREGISTERED,表示该 entry 已脱离轮子链表、挂在 Wheel.pending 上——此时尚未写入 result,也尚未取出 waker。调用方(process_expiration)负责 pending.push_front(item);此处不会调用 fire。
fire——完成定时器(由 Handle::process_at_time 调用)15:
// tokio/src/runtime/time/entry.rs: 211-231
unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
let cur_state = self.state.load(Ordering::Relaxed);
if cur_state == STATE_DEREGISTERED {
return None; // 已经触发过了
}
// 写入结果
unsafe { self.result.with_mut(|p| *p = result) };
// 标记为已注销
self.state.store(STATE_DEREGISTERED, Ordering::Release);
// 取出之前注册的 waker 以便唤醒
self.waker.take_waker()
}
fire 本身只写 result、置 STATE_DEREGISTERED 并取出 waker;谁在何时调用它由驱动层的 process_at_time 决定:
为什么 fire() 用普通 store 而不是 CAS?
你可能会注意到,fire() 修改 state 时用的是普通的 store(Release),而不是 CAS。这和其他方法(如 mark_pending()、extend_expiration())形成对比——为什么呢?
关键原因:fire() 被调用时,已经没有并发写入者了。
定时器触发的完整路径是两阶段的:
mark_pending() fire()
│ │
├─ CAS: tick → STATE_PENDING_FIRE ├─ store: PENDING_FIRE → DEREGISTERED
│ (条件更新:只有 state 还是原 tick 才改) │ (确定性更新:state 一定是 PENDING_FIRE)
│ │
└─ 入 pending 链表 └─ 写 result、取 waker
mark_pending() 已经用 CAS 把 state 从具体的 tick 值原子地转换成了 STATE_PENDING_FIRE(u64::MAX - 1,即 ≥ STATE_MIN_VALUE)。而 extend_expiration()(用户线程调用的无锁乐观更新)的保护逻辑是:
fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> {
let mut prior = self.state.load(Ordering::Relaxed);
loop {
if new_timestamp < prior || prior >= STATE_MIN_VALUE {
return Err(()); // state 已被驱动改成了 PENDING_FIRE → 走慢路径
}
// CAS 尝试更新
}
}
一旦 state 变成 STATE_PENDING_FIRE(≥ STATE_MIN_VALUE),extend_expiration() 会因 prior >= STATE_MIN_VALUE 立即返回 Err,转而走需要获取驱动锁的重新注册路径。所以当 fire() 执行时,可能的并发写入者已经全部退出了——fire() 对 state 拥有独占访问权,一个普通的 store(Release) 就足够了。
| 场景 | mark_pending / extend_expiration |
fire |
|---|---|---|
| 是否有并发写入者 | ✅ 是(用户线程可能同时 extend_expiration) |
❌ 否(extend_expiration 已因 state ≠ tick 而退出) |
| 需要的语义 | 条件性更新:”只有 state 还是我期望的值才改” | 确定性更新:”state 一定是 PENDING_FIRE,直接写 DEREGISTERED” |
| 使用的原子操作 | compare_exchange_weak |
store(Release) |
行首的防御性检查 if cur_state == STATE_DEREGISTERED { return None; } 只是防止 fire() 被意外重复调用的安全网——正常情况下走到这里的 state 一定是 STATE_PENDING_FIRE。
总结:mark_pending() 的 CAS 相当于”加锁获取所有权”,fire() 的 store 相当于”持有所有权时直接写入”。两阶段设计让每个操作使用最合适的原子原语,既保证正确性又避免不必要的 CAS 开销。
process_at_time——从 pending 弹出并 fire16:
// tokio/src/runtime/time/mod.rs: 296-337(核心循环,略去时钟回拨处理)
pub(self) fn process_at_time(&self, mut now: u64) {
let mut waker_list = WakeList::new();
let mut lock = self.inner.lock();
while let Some(entry) = lock.wheel.poll(now) {
debug_assert!(unsafe { entry.is_pending() });
// 持 driver 锁;entry 已从轮子链表 / pending 中移除
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
waker_list.push(waker);
if !waker_list.can_push() {
drop(lock); // 避免持锁 wake 死锁
waker_list.wake_all();
lock = self.inner.lock();
}
}
}
lock.next_wake = lock.wheel.poll_at().map(/* ... */);
drop(lock);
waker_list.wake_all();
}
wheel.poll(now) 每次迭代优先 pending.pop_back();若 pending 为空,才在内部调 process_expiration(其中的 mark_pending 可能 newly 推入 pending,下一轮循环即弹出)。因此 mark_pending 与 entry.fire 同属一次 process_at_time 调用,但分属 Wheel 与 Handle 两层——上文的 StateCell::fire 正是被这里的 entry.fire(Ok(())) 调用。
触发入口:Driver::park_internal 在 park / park_timeout 返回后执行 handle.process(clock) → process_at_time(now)(§3.2 走读、§5 有更完整的 park_internal 上下文)。
少数路径会跳过 mark_pending、在持锁下直接 fire:例如 reregister 时 insert 返回 InsertError::Elapsed,或 clear_entry 取消已注册 timer。
4.1 为什么每个 TimerShared 独立拥有 StateCell?
理解 StateCell 之后,一个自然的问题是:为什么每个 TimerShared(即每个 timer)都独享自己的 StateCell?能不能多个 timer 共享一个?
答案是否定的。因为”一个 timer 的 poll 和另一个 timer 的 fire 之间的并发”和”同一个 timer 的 poll 和 fire 之间的并发”性质完全不同:
- 不同 timer 之间:没有状态依赖——task A 的到期不影响 task B
- 同一 timer 之内:
poll(用户端)和fire(驱动端)同时对同一个状态 CAS——一旦共享,就会把隔离开的并发变成全局争用
如果所有 timer 共享一个 StateCell,那就是一把全局锁的缩影——task A 的 poll() 和 task B 的 poll() 也要争同一个原子变量,毫无隔离性可言。
所以模式是:每个 timer 的并发边界是单 timer 粒度的,独立 StateCell 是最自然的选择。
如果没有 StateCell 会怎样?
三个具体问题,按严重程度排列:
1. 丢失唤醒(lost wakeup)——最致命
用传统 Mutex<bool> 替代 StateCell 的话,存在一个经典的 TOCTOU(Time-Of-Check to Time-Of-Use)窗口:
时间线:
Driver 线程: 检查到期 → 设 flag=true → waker.wake()
↑
Task 线程: poll() → 检查 flag=false → 注册 waker → 返回 Pending
Task 线程读 flag(false)和注册 waker 之间插入了 Driver 线程:flag 被设为 true,wake 被调用——但 Task 已经检查了 flag 为 false,即将注册 waker 后返回 Pending。这个 wake 永久丢失,任务永远醒不过来。
StateCell 用 AtomicU64 + 先注册 waker 再读状态的顺序消除这个窗口:即使在写入 waker 后、读状态前 Driver 调用了 fire,read_state() 读到 STATE_DEREGISTERED 后直接返回 Ready,不会丢失。
2. 重复触发(double fire)
没有原子状态区分”正在 fire”和”已被取消”——Driver 可能在一个被 cancel() 的 timer 上再次调用 waker.wake(),造成任务被错误唤醒两次。
StateCell 的 fire() 中先检查 cur_state == STATE_DEREGISTERED,是则返回 None;CAS 确保一次到期只转换为一次 wake()。
3. 锁竞争放大
没有原子状态,每层操作都持锁:
- 用
Mutex<Wheel>整层加锁:Driver 收割时锁住所有 timer,Task 的poll()全得等 - 用
PerTimer Mutex<bool>:Driver 批量收割上千 timer,每个取锁放锁,锁开销随连接数线性增长
StateCell 把高频路径——poll() 检查是否到期——变成一次 AtomicU64::load:免锁、无竞争、单条指令。只在真正的竞争边界(CAS 更新状态)需要一条原子指令,不持有任何锁。
每个 timer 一个 AtomicU64,遍历 N 个 timer 要 N 次 CAS,影响精度吗?
一个自然的担心:Driver 在 process_at_time 的一次 while wheel.poll(now) 里可能要处理大量到期 timer(process_expiration 里多次 mark_pending,弹出后再 fire,每个 entry 至少两次 CAS),如果同一 slot 里有几千个 timer,遍历开销会不会让它们”不准时”?
实际约束让这个开销无关紧要:
约束一:每个 CAS 的成本极低。一次 AtomicU64::compare_exchange 在 x86 上是一条 lock cmpxchgq 指令,无争用时约 10-20ns。1,000 个 timer 同时到期 ≈ 10-20μs 的遍历时间。
约束二:slot 粒度本身就是保护层。时间轮 level 0 的 slot 宽度是 1ms。同一 slot 内的所有 timer 本来就在同一个 1ms 窗口内到期。遍历开销占 slot 宽度的 1-2%。
t=1000ms level-0 slot 边界
├─ timer A (1000.1ms 到期)
├─ timer B (1000.5ms 到期)
├─ timer C (1000.7ms 到期)
└─ ... 最多几千个 timer
t=1001ms level-0 slot 边界
↑ 遍历 1,000 个 timer 耗时 ~15μs
↑ 不影响下一个 slot(1001ms 才需要检查)
约束三:锁零争用。CAS 只在 Driver 和 Task 同时操作同一个 timer 时才失败。大多数 timer 到期时对应的任务睡眠在 Poll::Pending——CAS 一次成功,无重试。对比 Mutex 方案:收割时 lock() + unlock() 一千次,且 Task 端的 poll() 也要抢同一把锁,争用放大。
真实的权衡是17:
每个 timer 一个 AtomicU64:
× 遍历 N 个 timer 需要 N 次 CAS
✓ 零锁争用、零 syscall、O(N) 的 ~10ns/timer 线性代价
替代方案——每个 slot 一把锁:
× Task poll() 必须等 Driver 放锁(可能活锁)
× 锁竞争随 timer 数量超线性放大
✓ 批量移动 slot 时只需一次锁操作
Tokio 选择了 CAS 方案。因为 timer 数量越多,CAS 的优势越明显——锁竞争是超线性放大,CAS 遍历是严格 O(N) 且常数极小。
五、运行时如何驱动定时器
前面讲的是 Sleep 的结构和状态管理,但这些定时器到底是怎么被驱动的?答案在运行时的主循环里。
驱动 park_internal() 的不是某个特殊事件,而是调度器 worker 线程的空闲检测——”没活干了就得等,等的同时顺便把到期的 timer 处理掉”。两种 runtime 的触发逻辑一致,但 park 机制不同:
multi_thread(每个 worker 线程的主循环):
loop {
从本地队列取 task → poll 它
如果本地空 → 偷其他 worker 的 task
如果全都空 → park_timeout()
│
└─ try_lock(Driver) 抢到?
├─ 是 → driver.park_timeout()
│ └─ time::Driver::park_internal()
│ ├─ next_wake = wheel.next_expiration
│ ├─ epoll_wait(timeout) ← 线程休眠
│ └─ handle.process(clock) ← 处理到期 timer
│
└─ 否 → Condvar.wait()(其他 worker 已在当 driver)
}
current_thread(单线程主循环):
block_on(future) {
loop {
从本地队列取 task → poll 它
如果本地空 → 查 inject 队列
如果全空 → park()
└─ time::Driver::park_internal()
├─ (同上:查 next_wake → epoll_wait → process)
└─ handle.process(clock)
}
}
所以 worker 线程的时间线是 poll task 和 park driver 交替进行:
worker 线程时间线:
poll task A → poll task B → 队列空了 → park(driver) → process() → 有 task 被唤醒 → poll task C → ...
↑ 这里触发 ↑ 这里处理到期 timer,唤醒等待 task
multi_thread 里哪个 worker 抢到 TryLock 哪个就当 driver,其余用 Condvar 退化等待。这就解释了为什么 StateCell 的并发窗口存在:worker-A 在 poll Sleep 的同时,worker-B 可能正作为 driver 执行 process()。
在这个主循环的驱动下,Tokio 的运行时中有两个 Driver,各有自己的核心函数:
- Timer Driver(
tokio/src/runtime/time/mod.rs):核心函数是park_internal()。它查时间轮得最早 deadline,然后委托 I/O Driver 执行阻塞等待,醒来后收割时间轮。 - I/O Driver(
tokio/src/runtime/io/driver.rs):核心函数是turn()。它持有mio::Poll,执行epoll_wait(events, timeout),醒来后分发 I/O 事件到ScheduledIo。
这两个 Driver 是分别独立初始化的——在 runtime::driver::Driver::new() 中,先创建 I/O Driver,再在其上包裹 Time Driver,两者各自持有一份内部状态18:
// tokio/src/runtime/driver.rs: 108-117
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
// ↑ I/O Driver 从这里创建,持有 mio::Poll
let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, cfg.timer_flavor, io_stack, &clock);
// ↑ Time Driver 从这里创建,持有 Wheel
// ↑ io_stack 作为 IoStack 传给 Time Driver——这就是"堆叠"
}
创建出的 Handle 包含了两个子句柄:
// tokio/src/runtime/driver.rs: 52-59
pub(crate) struct Handle {
pub(crate) io: IoHandle, // I/O Driver 的句柄
pub(crate) signal: SignalHandle,
pub(crate) time: TimeHandle, // Time Driver 的句柄
pub(crate) clock: Clock,
}
这个 Handle 被 Arc 包装后分发给所有 worker 线程——所有线程共享同一个 Driver 实例,没有 per-thread 的 Driver。多线程访问通过内部锁串行化:Time Driver 的 Wheel 受 Mutex<InnerState> 保护(一次只有一个线程收割),I/O Driver 的 epoll_wait 在 &mut self 上自然串行。
两者的关系是堆叠调用——Timer 的 park_internal() 内部调用 I/O Driver 的 turn(),后者执行实际的 epoll_wait 阻塞:
Timer::park_internal()
├── 查时间轮 → 计算 timeout(最早 deadline - now)
├── IoStack::park_timeout(timeout)
│ └── I/O Driver::turn()
│ └── epoll_wait(events, timeout) ← 真正的阻塞点
├── process_at_time() ← 收割到期 timer
└── (I/O 事件已在 turn() 里处理:ScheduledIo::wake)
每个 Driver 维护自己的数据结构,醒来后各管各的:
graph TD
subgraph Runtime::park_internal
A["开始"] --> B["Timer::park_internal()"]
B --> C["1. 查 Wheel → next_wake"]
C --> D["2. IoStack::park_timeout(duration)"]
D --> E["I/O Driver::turn()"]
E --> F["epoll_wait(events, timeout)"]
F -->|"唤醒"| G["分发 I/O 事件<br/>→ ScheduledIo::wake()"]
G --> H["Timer::process_at_time()"]
H --> I["收割到期 timer<br/>→ StateCell::fire()"]
end
style B fill:#e1f5fe
style E fill:#c8e6c9
style H fill:#e1f5fe
style G fill:#c8e6c9
Timer Driver 决定”等多久”,I/O Driver 负责”执行等待”,醒来后各处理各的数据——Timer 收割时间轮,I/O 分发事件到 ScheduledIo。两者在同一个线程上、同一次 epoll_wait 返回后依次完成。
下面来看 Timer Driver 的 park_internal 具体实现16:
// tokio/src/runtime/time/mod.rs: 213-256
fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
let handle = rt_handle.time();
let mut lock = handle.inner.lock();
// 获取时间轮中最近的到期时间
let next_wake = lock.wheel.next_expiration_time();
lock.next_wake = next_wake.map(|t|
NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())
);
drop(lock);
match next_wake {
Some(when) => {
let now = handle.time_source.now(rt_handle.clock());
let mut duration = handle.time_source
.tick_to_duration(when.saturating_sub(now));
if duration > Duration::from_millis(0) {
// 有定时器在将来才到期:带超时地 park 线程
self.park_thread_timeout(rt_handle, duration);
} else {
// 有定时器已经到期或即将到期:立即返回
self.park.park_timeout(rt_handle, Duration::from_secs(0));
// ↑ 不能留空。park_timeout(0s) 走完整 Driver 链但每层只做
// 非阻塞检查——epoll_wait(0)、release_pending_registrations、
// 信号处理、子进程清理——不阻塞,但每层的事都办了。
}
}
None => {
// 没有定时器且有 limit:带超时 park
// 没有定时器且没有 limit:无限期 park
}
}
// 醒来后处理到期的定时器
handle.process(rt_handle.clock());
}
handle.process(clock) 是一条从驱动层通往时间轮的完整调用链16:
time::Handle::process(clock) → time/mod.rs
└→ time::Handle::process_at_time(now) → time/mod.rs
└→ Inner::lock() → MutexGuard<InnerState> ← 持 driver 锁
while InnerState.wheel: Wheel::poll(now) → wheel/mod.rs
├─ Wheel.pending.pop_back() → 返回 TimerHandle ─┐
└─ 或 Wheel::poll 内 loop: │ │
Wheel::next_expiration() │ │
Wheel::process_expiration(exp) │ │
TimerHandle::mark_pending(d) │ ← 不 fire │
└→ StateCell::mark_pending │ │
Ok → Wheel.pending.push_front│ │
Err → level_for() │ │
Level::add_entry(h) │ │
Wheel::set_elapsed(deadline) │ │
└─ 返回 TimerHandle ──────────────────┘ │
TimerHandle::fire(Ok(())) → entry.rs(consume)│
└→ StateCell::fire() → 写 result、取 waker
InnerState.next_wake ← Wheel::poll_at()
drop(MutexGuard)
WakeList::wake_all()
类型分层:time::Handle 持锁并驱动循环;wheel::Wheel 推进时间、收割 slot(process_expiration 是 Wheel 的方法,不是 Handle 的);TimerHandle 是对 TimerShared 的短命句柄,mark_pending / fire 定义在 TimerHandle 上,内部再委托 StateCell 做原子操作;降级时 level_for()(模块级函数)配合 wheel::Level::add_entry 把 handle 挂回更低层 slot。
流程很清晰:
sequenceDiagram
participant RT as Runtime Worker
participant Driver as Time Driver
participant Wheel as Hashed Wheel
participant OS as OS Timer
RT->>Driver: park_internal()
Driver->>Wheel: next_expiration_time()
Wheel-->>Driver: Some(5000) = 5 秒后
Driver->>OS: park_timeout(5s)
Note over OS: 线程休眠,CPU 可服务其他任务
OS-->>Driver: 5 秒后唤醒(或更早被 unpark)
Driver->>Wheel: process_at_time → wheel.poll(now)
Note over Wheel: 收割时 mark_pending 入 pending
Wheel-->>Driver: 弹出 TimerHandle
Driver->>Driver: entry.fire → WakeList
Driver->>RT: wake_all()
RT->>RT: 重新 poll 对应的 Sleep Future
关键点在于:线程的休眠时间是由时间轮中最早的到期时间决定的。如果有多个 sleep 在等待,线程会以最早的 deadline 作为 park timeout;当时间到达(或被其他事件提前 unpark)后,驱动处理所有到期的定时器,批量唤醒对应的任务。
具体的处理逻辑在 Handle::process_at_time16:
// tokio/src/runtime/time/mod.rs: 296-337
pub(self) fn process_at_time(&self, mut now: u64) {
let mut waker_list = WakeList::new();
let mut lock = self.inner.lock();
// 从时间轮中收集所有到期(≤ now)的定时器
while let Some(entry) = lock.wheel.poll(now) {
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
waker_list.push(waker);
if !waker_list.can_push() {
// 批次满了,释放锁后批量唤醒
drop(lock);
waker_list.wake_all();
lock = self.inner.lock();
}
}
}
// 更新下一次唤醒时间
lock.next_wake = lock.wheel.poll_at()
.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
drop(lock);
// 唤醒剩余 waker
waker_list.wake_all();
}
Wheel::poll(now) 内部的 loop 是这条链的核心——它每次调 next_expiration() 找到最早到期的 slot,处理完再找下一个,直到没有 deadline ≤ now 的 slot 为止。所以如果线程睡了 5 秒后醒来、now - elapsed = 5000ms,这个 loop 会把这 5000ms 内跨过的所有 slot(从 Level 0 到 Level 5)逐个处理完:
Wheel::poll(now=6000), elapsed=1000
→ next_expiration → Level 0 slot 1000 (deadline=1000) → process → set_elapsed(1000)
→ next_expiration → Level 0 slot 1001 (deadline=1001) → process → set_elapsed(1001)
...
→ next_expiration → Level 1 slot 0 (deadline=4096) → process → set_elapsed(4096)
→ next_expiration → Level 1 slot 1 (deadline=8192) → 8192 > 6000 → break
→ set_elapsed(6000)
next_expiration_time()(在 park_internal 中用于算睡多久)只找一个最早 deadline;而 Wheel::poll(now)(在 process_at_time 中用于醒来后收割)循环收割全部 deadline ≤ now 的 slot,不限层级不限数量。前者说”最早什么时候醒”,后者处理”该醒时哪些到了”。
这里使用 WakeList 来批量收集 waker,在释放驱动锁之后再统一调用 wake(),避免持有锁时调用外部代码带来死锁风险。
5.x 三者关系梳理:Handle、StateCell 与 Waker
前面四、五两节分别详细拆解了 StateCell 的原子状态机和 Driver 的驱动流程。这里用一张结构图把两者的交汇点——Handle(驱动入口)、StateCell(状态机)、Waker(唤醒令牌)三者之间的协作关系集中梳理清楚,便于形成整体 mental model。
从 Sleep 到 Waker 的三层封装关系如下:
Sleep (用户层面的 Future)
└── timer: TimerEntry → 拥有 timer 的所有权
├── driver: scheduler::Handle → 共享的调度器句柄(Arc 引用)
│ │
│ └── .time(): &Handle → 获取 Time Driver 句柄
│ ├── time_source → Instant ↔ tick 转换
│ └── inner → Mutex<InnerState { wheel: Wheel }>
│
└── inner: TimerShared → 侵入式节点,同时存在于 Sleep 和 Wheel 中
├── state: StateCell → 原子状态机,存 deadline 和 Waker
├── registered_when → wheel 中的槽位位置
└── pointers → 侵入式双向链表指针
这里的关键不对称性:TimerEntry 引用 Handle(多对一,所有 timer 共享同一个 driver);而 TimerShared 作为侵入式节点链入 Wheel(一对一,每个 timer 恰好占据时间轮中的一个槽位)。StateCell 是这两者的交汇点——它被 TimerShared 持有,同时被 Driver 和 task 两侧并发访问。
StateCell 的多线程并发是真实存在的。 在 multi-thread runtime 中,worker 线程 A 上运行的任务调用
Sleep::poll()→StateCell::poll()(无锁),与此同时 worker 线程 B 正在执行park_internal()→process()→StateCell::fire()(持有Mutex<InnerState>保护 Wheel,但 StateCell 本身在此锁之外)。Mutex<InnerState>只串行化了 Driver 端对 Wheel 的访问,Task 端对 StateCell 的访问完全不受此锁约束——这正是StateCell必须用 CAS 和AtomicU64的原因。如果是current_threadruntime 则所有操作在同一线程上,原子操作虽无争用但仍正确执行。
下面用两条线程路径的具体代码来展示这个并发窗口——不是伪代码,而是真实的 Tokio 源码调用链,标注了每条路径上当前持有哪些锁、线程 ID 如何不同。
工作线程 A(执行 User Task) 工作线程 B(执行 Driver)
══════════════════════════════ ═══════════════════════════
park_internal()
│
tokio::spawn(async { ├─ lock = handle.inner.lock()
// poll() 触发注册: │ ↓ 持有 Mutex<InnerState>
Sleep::poll(cx) │ 保护的是 Wheel(链表操作),
│ │ 不是 StateCell
└→ Sleep::poll_elapsed(cx) │
│ ├─ next_wake = wheel
└→ TimerEntry::poll_elapsed(cx) │ .next_expiration_time()
│ ├─ drop(lock) ← 释放锁
│ // 无锁! │
└→ self.inner.state ├─ self.park.park_timeout(
.poll(cx.waker()) │ duration)
│ │ ↓ epoll_wait, 线程休眠
│ StateCell::poll(): │
├─ waker.register(waker) │ 醒来!
│ // AtomicWaker:: │
│ // register_by_ref ├─ handle.process(clock)
│ │
├─ state.load(Acquire) └→ process_at_time(now)
│ // 读 AtomicU64 │
│ ├─ lock = inner.lock()
├─ 若 != DEREGISTERED │ ↓ 再次持有 Mutex
│ └→ Poll::Pending │
│ ├─ while entry =
│ lock.wheel.poll(now)
// Task 让出执行权,线程 A 继续跑 │
// 其他 task... │ 对每个到期的 entry:
│
│ └→ entry.fire(Ok(()))
│ │
│ │ TimerHandle::fire()
│ │ └→ StateCell::fire()
│ │ ├─ state.load(Relaxed)
│ │ ├─ state.store(
│ │ │ DEREGISTERED,
│ │ │ Release)
│ │ └→ waker.take_waker()
│ │ // AtomicWaker::take
│ │ // → Option<Waker>
│ │
├─ drop(lock)
└→ waker_list.wake_all()
└→ Waker::wake()
↑ 唤醒线程 A 上的 task
两条路径在 StateCell 上汇合,但走的是不同的原子操作:
| 路径 | 操作 | 原子指令 | 持有的锁 |
|---|---|---|---|
| Worker A(task) | poll(cx.waker()) |
AtomicWaker::register_by_ref + AtomicU64::load(Acquire) |
无 |
| Worker B(driver) | fire(Ok(())) |
AtomicU64::load(Relaxed) + AtomicU64::store(Release) + AtomicWaker::take |
Mutex<InnerState>(仅保护 Wheel,不保护 StateCell) |
关键观察:Worker B 的 Mutex<InnerState> 保护的是 lock.wheel.poll(now)(遍历侵入式链表、从 slot 移除 entry),但 StateCell::fire() 内部的原子操作完全在此锁的保护范围之外——锁只保证 entry 被正确从 Wheel 移除并拿到 TimerHandle,之后 fire() 对 StateCell 的写入是 lock-free 的。Worker A 的 poll() 全程无锁,只靠原子操作与 Worker B 协调。
这个并发窗口真实存在的原因是:Tokio 的 multi-thread runtime 中,Driver 并不是一个独立的后台线程,而是由当前恰好空闲、需要 park 的那个 worker 线程来执行的。所以在任意时刻:
- 某些 worker 在执行用户 task(可能正在 poll Sleep)
- 某个 worker 在执行 Driver 代码(正在 park 并处理到期 timer)
两者可以同时操作不同的 StateCell(各自处理各自的 timer),也可以同时操作同一个 StateCell(一个在 poll,另一个在 fire)。后者正是 CAS 要处理的竞争窗口。
用一个最简单的 Tokio 程序就可以创造出这个场景:
// worker_threads >= 2,让 Driver 和 task 跑在不同的 worker 上
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
// 任务 A:一个即将到期的 sleep,会被 Driver 快速 fire
let task_a = tokio::spawn(async {
// poll() 在 worker-1 上注册 Waker 到 StateCell
tokio::time::sleep(Duration::from_millis(1)).await;
println!("task A woken");
});
// 任务 B:让另一个 worker 保持忙碌,迫使 Driver 跑在 worker-2 上
let task_b = tokio::spawn(async {
// 这个 sleep 较长,worker-2 最终会 park,期间执行 Driver 逻辑
tokio::time::sleep(Duration::from_millis(100)).await;
});
// task A 的 sleep(1ms) 几乎立即到期:
// worker-1: 正在 poll task A 的 Sleep → StateCell::poll() 注册 Waker
// worker-2: park 中 → process() 收割到期 timer → 同一个 StateCell::fire()
// ↑ 这就是并发窗口
let _ = tokio::join!(task_a, task_b);
}
严格来说,这个示例不能保证一定会触发竞争窗口——线程调度由 OS 决定,CAS 也可能一次成功。但它展示了并发窗口存在的架构条件:
worker_threads >= 2、有 task 在 poll Sleep 的同时有 worker 在执行 Driver。
为什么 multi_thread 有并发而 current_thread 没有?
上面的并发场景只在 flavor = "multi_thread" 下存在。两种 runtime 的本质差异在于 Driver 的共享方式和 park 机制不同。
在 current_thread 中,Driver 被 Core 独占持有19:
// tokio/src/runtime/scheduler/current_thread/mod.rs: Core 结构体
struct Core {
tasks: VecDeque<Notified>,
driver: Option<Driver>, // ← Driver 归 Core 独占,Option 取出/放回
// ...
}
唯一的线程 park 时直接从 Core 取出 Driver、执行 park_internal→process()、用完放回。task 的 Sleep::poll() 和 Driver 的 process() 在同一线程上严格串行,StateCell 的 CAS 不会遇到真正的多核竞争。
current_thread(单线程):
poll task A → poll task B → 无活 → park(driver) → process() → 继续 poll
↑ 同一线程,所有操作串行
在 multi_thread 中,Driver 被包在 Arc<Shared> 里,多个 worker 通过 TryLock 竞争20:
// tokio/src/runtime/scheduler/multi_thread/park.rs: Shared
struct Shared {
driver: TryLock<Driver>, // ← 多 worker 共享,谁抢到谁当 driver
}
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) -> HadDriver {
if let Some(mut driver) = self.inner.shared.driver.try_lock() {
// 抢到了 → 成为 driver 线程,执行 park_internal → process()
self.inner.park_driver(&mut driver, handle, Some(duration))
} else {
// 没抢到 → 用 Condvar 退化等待,不做 driver 工作
self.inner.park_condvar(Some(duration));
}
}
TryLock 只保护”谁有资格当 driver”——抢到的 worker 独占 park_internal 和 process() 的执行权。但另一个 worker 仍然可以无锁地执行 task 并调用 StateCell::poll(),两者在 StateCell 上形成真正的多核并发。
multi_thread(双 worker):
worker-0: poll task A → StateCell::poll(waker) → Pending ──→ poll 其他 task
worker-1: poll task B ──→ 无活 → try_lock(driver) → 抢到!
↓
park_internal()
↓
process() → StateCell::fire() → waker.wake()
↑
同一时刻 worker-0 也在操作 StateCell(poll 其他 task)
两种 runtime 的 Driver 共享模式对比:
current_thread |
multi_thread |
|
|---|---|---|
| Driver 持有 | Core 独占 Option<Driver> |
Arc<Shared { TryLock<Driver> }> 共享 |
| Park 机制 | 直接从 Core 取 | try_lock() 竞争,抢不到退化 Condvar |
| Task poll vs Driver process | 同一线程,串行 | 可能在不同线程,并发 |
| StateCell 并发 | 不存在(原子操作无实际争用) | 存在(CAS 多核竞争) |
classDiagram
class Sleep {
+deadline: Instant
+entry: Timer
+poll(cx) Poll
}
class TimerEntry {
-driver: scheduler~Handle
-inner: TimerShared
-deadline: Instant
+poll_elapsed(cx) Poll
+reset(deadline)
}
class TimerShared {
+state: StateCell
+registered_when: AtomicU64
+pointers: linked_list.Pointers
}
class StateCell {
-state: AtomicU64
-waker: AtomicWaker
+poll(waker) Poll
+fire(result) Option~Waker~
+mark_pending(not_after) Result
}
class Handle {
+time_source: TimeSource
+inner: Inner
+process(clock)
+process_at_time(tick)
}
class Waker {
+wake()
}
class Wheel {
+poll(now) Option~TimerHandle~
+insert(entry) Result
+next_expiration_time() Option~u64~
}
Sleep *--> TimerEntry
TimerEntry *--> TimerShared
TimerShared *--> StateCell
TimerEntry --> Handle : 引用(Arc)
Handle --> Wheel : 持有(Mutex 保护)
Wheel --> TimerShared : 存储(侵入式链表)
StateCell ..> Waker : fire() 时取出
这张图的三个关键连接:
-
StateCell←→Waker:StateCell::poll()存入 Waker(来自cx.waker()),StateCell::fire()返回 Waker。这是 task 线程与 driver 线程的唯一”数据交换点”——driver 通过fire()拿到 waker 后调用wake(),通知 executor 重新调度 task。 -
Handle→Wheel→TimerShared→StateCell:Driver 的process()是驱动链的起点,它获取锁、驱动Wheel::poll()收割到期 slot、对每个到期的TimerShared调用fire()取出 Waker、释放锁后批量wake_all()。整个过程Handle完全不需要知道具体 task 的存在——它只和 waker 打交道。 -
TimerEntry→Handle:多个TimerEntry共享同一个Handle(通过scheduler::Handle中的Arc),这使得 driver 是全局单例,避免每个 timer 各自维护一套时间轮的重复开销。
三者分工如下:
| 组件 | 角色 | 所属关系 |
|---|---|---|
| Handle | 驱动入口,持有 TimeSource 和被 Mutex 保护的 Wheel |
所有 timer 共享一个(Arc-like) |
| StateCell | 每个 timer 的原子状态机;存储 deadline/状态和注册的 Waker | 每个 Sleep 一个(在 TimerShared 内) |
| Waker | 来自 executor 的不透明唤醒令牌;wake() 将任务放回就绪队列 |
每次 poll() 一个(克隆到 AtomicWaker 中) |
上文描述了数据关系,下面把实际代码的调用链逐层追踪,看清楚 Handle → Wheel → TimerShared → StateCell → Waker 在代码层面是怎么串联起来的。
实际代码调用链
整条链从 park_internal 末尾的 handle.process(clock) 出发16:
// tokio/src/runtime/time/mod.rs: park_internal 末尾
// 线程从 park 醒来后,处理所有到期的定时器
handle.process(rt_handle.clock());
① 第一层:Handle::process → process_at_time
// tokio/src/runtime/time/mod.rs: Handle::process
pub(crate) fn process(&self, clock: &Clock) {
// TimeSource 将 Clock::now() 转为驱动启动以来的毫秒 tick
let now = self.time_source.now(clock);
// 根据定时器类型分派:
// Traditional → self.process_at_time(now)
// Alternative → self.process_at_time_alt(...)
self.process_at_time(now);
}
process() 是公开入口(被 park_internal 调用),它只做两件事:获取当前时间、委托给 process_at_time。
② 第二层:process_at_time → Wheel::poll
// tokio/src/runtime/time/mod.rs: Handle::process_at_time
pub(self) fn process_at_time(&self, mut now: u64) {
let mut waker_list = WakeList::new();
let mut lock = self.inner.lock(); // ← 获取 Mutex<InnerState>
// Wheel::poll 循环:收割所有 deadline ≤ now 的 slot
while let Some(entry) = lock.wheel.poll(now) {
// ↓ entry 类型是 TimerHandle(NonNull<TimerShared>)
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
waker_list.push(waker); // ← 收集 Waker,延迟唤醒
// 批次满了就释放锁、批量唤醒、再获取锁
}
}
// 更新下次唤醒时间
lock.next_wake = lock.wheel.poll_at()...;
drop(lock); // ← 释放锁
waker_list.wake_all(); // ← 批量调用 Waker::wake()
}
这里 lock.wheel.poll(now) 返回的 entry 是 TimerHandle——一个 NonNull<TimerShared> 包装。它被从 Wheel 的侵入式链表中取出后,所有权交给了这段循环。
③ 第三层:entry.fire() → StateCell::fire → Option<Waker>
// tokio/src/runtime/time/entry.rs: TimerHandle::fire
pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> {
unsafe { self.inner.as_ref().state.fire(completed_state) }
// ↑ NonNull<TimerShared> ↑ StateCell::fire()
}
TimerHandle::fire 是一层薄薄的代理,真正的逻辑在 StateCell::fire15:
// tokio/src/runtime/time/entry.rs: StateCell::fire
unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
// 原子检查:是否已触发(STATE_DEREGISTERED)
let cur_state = self.state.load(Ordering::Relaxed);
if cur_state == STATE_DEREGISTERED {
return None; // 已触发过,返回 None(避免重复唤醒)
}
// 写入结果
self.result.with_mut(|p| *p = result);
// 原子标记为已注销
self.state.store(STATE_DEREGISTERED, Ordering::Release);
// 取出之前 poll() 时存入的 Waker 并返回
self.waker.take_waker()
// ↑ AtomicWaker::take() → Option<Waker>
}
这是整条链的核心跃迁点:StateCell 的 AtomicU64 状态从任意非终止状态(PendingFire 或 Scheduled)切换到 STATE_DEREGISTERED,同时取出 AtomicWaker 中存储的 Waker。这个 Waker 正是任务首次 poll 时通过 StateCell::poll(waker) 注册进去的——一进一出,任务唤醒的闭环完成。
④ 第四层:回到 process_at_time → waker_list.wake_all()
// tokio/src/runtime/time/mod.rs: 回到 process_at_time
drop(lock); // 释放 driver 锁
waker_list.wake_all(); // 批量调用所有收集到的 Waker::wake()
WakeList 将收集到的 Waker 批量调用 wake(),通知 executor 将对应 task 放回就绪队列。在释放锁之后再唤醒是刻意设计——如果在持有 Mutex<InnerState> 时调用 wake(),被唤醒的任务可能立即被调度并尝试获取同一把锁,导致死锁。
整条链的数据流图
graph LR
A["TimerEntry<br/>.driver 字段"] -->|"scheduler::Handle"| B["scheduler::Handle<br/>运行时全貌"]
B -->|".driver()"| C["driver::Handle<br/>各 driver 聚合"]
C -->|".time()"| D["time::Handle<br/>Time Driver 专用"]
D -->|".process(clock)"| E["time::Handle::process()"]
E -->|"time_source.now(clock)"| F["now: u64"]
E -->|"委托"| G["time::Handle::process_at_time(now)"]
G -->|"time::Handle.inner.lock()"| H["time::InnerState<br/>(under Mutex)"]
H -->|"lock.wheel.poll(now)"| I["time::wheel::Wheel::poll()"]
I -->|"循环返回"| J["time::entry::TimerHandle"]
J -->|"entry.fire(Ok(()))"| K["time::entry::TimerHandle::fire()"]
K -->|"代理"| L["time::entry::StateCell::fire()"]
L -->|"CAS: store DEREGISTERED"| M["AtomicU64::store<br/>(Release)"]
L -->|"提取"| N["Option<Waker>"]
N -->|"push"| O["crate::util::WakeList"]
O -->|"drop(lock) 后"| P["waker_list.wake_all()"]
P -->|"调用"| Q["std::task::Waker::wake()"]
Q -->|"通知"| R["Executor 就绪队列"]
style A fill:#f3e5f5
style B fill:#e1f5fe
style C fill:#fff3e0
style D fill:#c8e6c9
style L fill:#c8e6c9
style Q fill:#f3e5f5
style R fill:#f3e5f5
从 TimerEntry.driver(scheduler::Handle)到 std::task::Waker::wake(),整条链先经过三层 Handle 导航(scheduler::Handle → driver::Handle → time::Handle),再进入四个 crate 私有模块(runtime/time/mod.rs → runtime/time/wheel/mod.rs → runtime/time/entry.rs → StateCell),最终收敛到标准库的 Waker::wake()。每一步都是纯粹的数据流动:time::Handle 持有锁驱动 wheel,Wheel 返回 time::entry::TimerHandle,TimerHandle::fire() 触发 time::entry::StateCell 状态迁移,StateCell 吐出 Waker。链上没有任何地方持有 task 的引用或了解 task 的身份——这也是为什么一套 Driver 可以驱动百万级 Sleep 实例的根本原因。
协作方向是单向的:Handle::process() 驱动 Wheel → Wheel 返回到期的 TimerShared → 调用 StateCell::fire() 提取 Waker → 调用 Waker::wake() 通知 executor。
这种解耦设计使得 Time Driver 与 executor 完全无关——driver 只需要知道”有 waker 需要调用”,而不需要知道 waker 背后是哪个 task、哪个 scheduler。StateCell 充当纯桥梁:poll() 接收 Waker,fire() 返回 Waker。而原子状态机(已在前一节详细分析)保证了即使 task 线程和 driver 线程并发访问同一个 StateCell,也能通过 CAS 正确协调,无需额外的锁。
六、最底层:从 park_timeout 到时钟中断
前面说 park_timeout 让线程休眠到最近的 deadline,但它到底是怎么”准时醒来”的?答案在最底层的硬件时钟中断。
Tokio 的 park_timeout 在 Linux 上最终会调用 mio::Poll::poll(events, timeout),后者通过 epoll_wait(timeout_ms) 进入内核。内核内部使用高精度定时器(hrtimer)来实现这个超时21:
// fs/eventpoll.c: 2017-2031 (Linux 6.x, ep_poll)
if (!eavail)
timed_out = !ep_schedule_timeout(to) ||
!schedule_hrtimeout_range(to, slack,
HRTIMER_MODE_ABS);
epoll_wait 的 int timeout_ms 参数先被转为 timespec64,再转为 ktime_t 绝对到期时间,最后通过 schedule_hrtimeout_range() 注册到内核的 hrtimer 红黑树中:
epoll_wait(epfd, events, maxevents, timeout_ms)
→ do_epoll_wait()
→ ep_poll()
→ schedule_hrtimeout_range(&expires, slack, HRTIMER_MODE_ABS)
→ hrtimer_start_range_ns() ← 插入 hrtimer 红黑树
→ schedule() ← 让出 CPU
sequenceDiagram
participant Tokio as Tokio Worker
participant mio as mio / epoll
participant Kernel as Linux Kernel
participant hrtimer as hrtimer 子系统
participant Clock as 时钟事件设备 (APIC/HPET)
Tokio->>mio: park_timeout(5000ms)
mio->>Kernel: epoll_wait(timeout=5000ms)
Kernel->>hrtimer: schedule_hrtimeout_range()
hrtimer->>hrtimer: enqueue_hrtimer() 插入红黑树
hrtimer->>Clock: 编程设备为 one-shot,5 秒后触发中断
Note over Tokio: 线程休眠,CPU 可服务其他任务
Note over Clock: ... 5 秒 ...
Clock-->>Kernel: 硬件时钟中断
Kernel->>hrtimer: hrtimer_interrupt()
hrtimer->>hrtimer: __hrtimer_run_queues() 遍历红黑树
hrtimer->>Kernel: hrtimer_wakeup() → wake_up_process()
Kernel-->>Tokio: epoll_wait 返回
Tokio->>Tokio: process_at_time() 处理到期定时器
6.1 C 语言的等价实现
Tokio 的 park_timeout 在 Linux 上依赖 epoll_wait 的超时参数。在 C 语言中,有两种不同的方式使用内核定时器,分别对应”主动检查”和”自动回调”两种模型。
timerfd + epoll(主动检查模型)
Tokio 底层走的是这条路径。timerfd 将定时器暴露为文件描述符,epoll 可以像监听网络事件一样监听它:
// 创建一个定时器文件描述符
int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
struct itimerspec spec = {
.it_value = { .tv_sec = 2, .tv_nsec = 0 } // 2 秒后触发
};
timerfd_settime(tfd, 0, &spec, NULL);
// 将 timerfd 注册到 epoll
int epfd = epoll_create1(0);
struct epoll_event ev = { .events = EPOLLIN, .data.fd = tfd };
epoll_ctl(epfd, EPOLL_CTL_ADD, tfd, &ev);
// 阻塞直到定时器到期
epoll_wait(epfd, &ev, 1, -1);
uint64_t exp;
read(tfd, &exp, sizeof(exp)); // 消耗到期事件
当定时器到期时,timerfd 变为可读,epoll_wait 返回。这与 Tokio 的 park_timeout 最终调用 epoll_wait(timeout_ms) 是同一个底层机制——只不过 Tokio 把超时时间作为 epoll_wait 的参数传递,而 C 版本显式创建了一个独立的 timerfd。两种方式最终都进入内核的 hrtimer 子系统。
timer_create + SIGEV_THREAD(自动回调模型)
C 标准还提供了另一种方式——POSIX 定时器。设置 sigev_notify = SIGEV_THREAD 后,内核在定时器到期时自动创建一个新线程来运行回调函数:
#include <signal.h>
#include <time.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
void timer_callback(union sigval val) {
int id = *(int *)val.sival_ptr;
printf("定时器 %d 到期!线程 ID: %lu\n", id, pthread_self());
}
int main() {
timer_t timer;
int id = 42;
struct sigevent sev = {
.sigev_notify = SIGEV_THREAD,
.sigev_value.sival_ptr = &id,
.sigev_notify_function = timer_callback,
};
timer_create(CLOCK_MONOTONIC, &sev, &timer);
struct itimerspec spec = {
.it_value = { .tv_sec = 2, .tv_nsec = 0 }
};
timer_settime(timer, 0, &spec, NULL);
printf("主线程继续做其他事...\n");
pause();
return 0;
}
这种方式看起来很像”注册回调,自动执行”的理想模型。但在高性能服务器中很少使用——从信号/线程上下文中有太多限制:不能可靠访问线程局部存储、锁的约束更严格、线程频繁创建销毁的开销不可控。因此,实际的高性能 C 程序(如 Nginx、Redis)都走 timerfd + epoll 路径,而不是 SIGEV_THREAD。
Tokio 的 park_timeout 本质上更接近 timerfd 模型——它不是靠回调自动执行的,而是把超时时间传给 epoll_wait,到期后线程被内核唤醒,再由 Driver 去时间轮里收割到期定时器。
为什么 Tokio 不走 SIGEV_THREAD 或信号路径?
既然内核提供了 SIGEV_THREAD 这种”自动回调”机制,Tokio 为什么不用?原因有三。
1. 锁竞争爆炸。 如果几十个定时器同时到期,每个都起一个线程去跑回调,这些线程会同时争 Wheel 的锁和任务的运行队列锁。而 Tokio 现在的设计是只有一个 Driver 线程在 park/unpark 循环里处理定时器——处理完一把锁释放、批量唤醒,零锁争用。
2. 没有批处理。 epoll_wait 返回后,Driver 在 process_at_time 里一次性收割所有到期定时器,把 waker 收集到 WakeList 里批量调用(参见第五节)。如果每个定时器单独起一个线程或信号,就失去了批处理的机会——每个回调都得单独走一遍上下文切换。
3. 信号的安全黑洞与合作式调度冲突。 信号处理函数只能调用 async-signal-safe 的函数,不能锁 mutex、不能分配内存,几乎什么也干不了。SIGEV_THREAD 虽然避开了信号限制,但它的抢占式特性会破坏 Tokio 的合作式调度模型。而现在的 epoll_wait → Driver 处理 → Waker::wake() → 任务入队 → 调度器 poll,每一步都在运行时掌控之中。
本质上,Tokio 已经有了回调机制——Waker。问题不是”怎么通知”,而是”怎么高效地通知”。epoll_wait 不是轮询,而是被内核挂起;醒来后批量收割时间轮,本质是把 n 个独立的”自动回调”合并成 1 个可预测的批处理循环。
6.2 红黑树 vs 哈希时间轮
有趣的是,内核的 hrtimer 和 Tokio 用了不同的数据结构。Tokio 的 time driver 使用哈希时间轮(本文第三节),而 Linux 的 hrtimer 使用红黑树(rbtree)来组织定时器22:
// include/linux/hrtimer_defs.h: 27-35
struct hrtimer_clock_base {
struct hrtimer_cpu_base *cpu_base;
clockid_t clockid;
unsigned int seq;
// 按到期时间排序的红黑树根节点
struct timerqueue_linked_head active;
ktime_t (*get_time)(void);
ktime_t offset;
};
enqueue_hrtimer() 将定时器插入红黑树,时间复杂度 O(log n)23:
// kernel/time/hrtimer.c: 1096-1104
/*
* enqueue_hrtimer - internal function to (re)start a timer
*
* The timer is inserted in expiry order. Insertion into the
* red black tree is O(log(n)).
*/
static bool enqueue_hrtimer(struct hrtimer *timer,
struct hrtimer_clock_base *base,
enum hrtimer_mode mode, bool was_armed)
这并不是说哪种数据结构更优——它们只是面向不同的约束。内核的 hrtimer 需要纳秒级精度和动态的到期时间分布,红黑树的 O(log n) 插入更为稳妥。Tokio 的哈希时间轮只需要毫秒级精度,但需要管理海量定时器(成千上万个 Sleep 并发),O(1) 插入和批量降级更适合这种场景。
有意思的是,Linux 内核也有一套低精度定时器(timer wheel),代码路径在 kernel/time/timer.c,它用的正是和 Tokio 类似的层级哈希轮结构——8 到 9 层,每层 64 个 bucket(LVL_SIZE = 64,LVL_DEPTH = 8/9),粒度和覆盖范围的表24:
// kernel/time/timer.c: 104-115 (HZ=1000 时)
//
// HZ 1000 steps
// Level Offset Granularity Range
// 0 0 1 ms 0 ms - 63 ms
// 1 64 8 ms 64 ms - 511 ms
// 2 128 64 ms 512 ms - 4095 ms
// 3 192 512 ms 4096 ms - 32767 ms
// 4 256 4096 ms (~4s) 32768 ms - 262143 ms
// 5 320 32768 ms (~32s) 262144 ms - 2097151 ms
// 6 384 262144 ms (~4m) 2097152 ms - 16777215 ms
// 7 448 2097152 ms (~34m) 16777216 ms - 134217727 ms
// 8 512 16777216 ms (~4h) 134217728 ms - 1073741822 ms
对比 Tokio 的 6 层 64 slot 设计,两者的核心思想同源——都源自哈希时间轮的经典设计。
6.3 从时钟中断到进程唤醒
当硬件时钟事件设备(Local APIC Timer 或 HPET)在 deadline 到达时产生中断,内核进入 hrtimer_interrupt()。这个函数做了几件关键的事23:
- 更新时间基准:
hrtimer_update_base()读取硬件计数器和系统时间 - 处理到期定时器:
__hrtimer_run_queues()遍历红黑树中所有到期时间 ≤ now 的定时器 - 调用定时器回调:对于 sleep 类定时器,回调是
hrtimer_wakeup()
hrtimer_wakeup 的实现非常直接——找到等待中的进程,把它叫醒23:
// kernel/time/hrtimer.c: 2184-2193
static enum hrtimer_restart hrtimer_wakeup(struct hrtimer *timer)
{
struct hrtimer_sleeper *t =
container_of(timer, struct hrtimer_sleeper, timer);
struct task_struct *task = t->task;
t->task = NULL;
if (task)
wake_up_process(task);
return HRTIMER_NORESTART;
}
wake_up_process() 将进程状态设为 TASK_RUNNING 并放入就绪队列。随后内核的调度器在合适的时机把它调度回 CPU。Tokio 的 worker 线程从 epoll_wait 返回,继续执行 process_at_time(),完成定时器的处理。
6.4 关键细节:这不是周期性的 tick
需要注意,现代 Linux 在 NOHZ(dynticks)模式下并不是靠周期性的时钟中断来驱动定时器的,而是把时钟事件设备编程为一次性(one-shot)触发。hrtimer_interrupt() 在处理完当前到期的定时器后,会重新编程时钟设备,让它在下一次最早到期的 deadline 时再产生中断23。
这意味着如果系统上没有定时器在等待(或者最近的定时器在 5 秒后),CPU 可以进入深度休眠状态,期间不产生任何时钟中断。这也是为什么 Tokio 的 park_timeout 在”没有定时器且没有 limit”时可以直接无限期 park——内核不会为它浪费任何 CPU 周期。
这条从 tokio::time::sleep 到硬件时钟中断的链路也就清楚了:
tokio::time::sleep(5s).await
→ Sleep::poll() 返回 Pending,注册到哈希时间轮
→ Driver::park_timeout(5s)
→ mio → epoll_wait(timeout=5000ms)
→ 内核 schedule_hrtimeout_range() → 插入红黑树
→ 编程时钟设备 one-shot 5 秒后触发
→ [CPU 休眠或服务其他任务]
→ 5 秒后时钟中断 → hrtimer_interrupt()
→ __hrtimer_run_queues() → hrtimer_wakeup() → wake_up_process()
→ epoll_wait 返回 → 线程醒来
→ process_at_time() → fire → wake_all()
→ Sleep::poll() 返回 Ready
6.5 定时器管理的存放位置:用户态 vs 内核态
如果把这几种方式放在一起对比,一个更本质的区别浮现出来——定时器到底存放在哪一层管理:
| 方案 | 定时器存储位置 | 数据结构 | 操作复杂度 | 内核中管理的定时器数量 |
|---|---|---|---|---|
C timerfd |
内核 hrtimer | 红黑树 | O(log n) | n 个 |
C timer_create(SIGEV_THREAD) |
内核 hrtimer | 红黑树 | O(log n) | n 个 |
| Tokio 时间轮 | 用户态哈希时间轮 | 多层时间轮 | O(1) + 1 × O(log 1) | 1 个 |
Tokio 在用户态维护时间轮,只向内核注册一个”最近 deadline”的唤醒点。这意味着几万个定时器的插入和删除都在用户态完成,O(1),没有系统调用;内核只需要管理 1 个定时器(而不是 n 个),红黑树的 O(log 1) ≈ O(1)。精度降低到毫秒级,但吞吐量大幅提升。
graph LR
subgraph "C timerfd(n 个定时器全部进内核)"
A1["用户态 app"] -->|"timerfd_settime × n"| K1["内核 hrtimer<br/>红黑树<br/>O(log n) 管理 n 个"]
end
subgraph "Tokio(1 个定时器进内核)"
A2["用户态 app"] -->|"O(1) 插入"| W["用户态哈希时间轮<br/>6 层 × 64 slot"]
W -->|"只注册最早 deadline"| K2["内核 hrtimer<br/>红黑树<br/>O(log 1) 管理 1 个"]
end
这正是 Varghese & Lauck 论文12思想在工程中的体现:用用户态的时间轮 + 少量内核辅助,替代全部在内核红黑树中管理。Tokio 不是在”绕过”内核机制,而是在它之上加了一层更高效的用户态调度。
6.6 epoll_wait 在这条链路中的真实角色
前面说了那么多”内核 hrtimer”和”epoll_wait”,容易产生一个误解:epoll_wait 在管理 tokio 的定时器吗?
答案是 epoll_wait 完全不知道 tokio 时间轮的存在。epoll_wait 的 timeout 参数只是一个”线程睡眠闹钟”。
本质上,epoll_wait 内部是两个唤醒源的 OR 组合:
epoll_wait(epfd, events, maxevents, timeout)
↑ ↑
I/O 事件就绪 hrtimer 到期
(网卡中断 → (时钟中断 →
协议栈 → epoll) LAPIC → hrtimer)
任何一个条件满足,epoll_wait 就返回。内核里大致是这样的循环25:
// 内核 ep_poll() 简化伪代码
int epoll_wait(..., int timeout_ms) {
if (timeout_ms > 0)
schedule_hrtimeout_range(timeout_ms); // 设一个 hrtimer
while (1) {
if (events_ready) return; // I/O 事件来了
if (timeout_expired) return 0; // hrtimer 到了
schedule(); // 让出 CPU,等任一条件满足
}
}
tokio 利用的正是这个”谁先到就醒谁”的 OR 语义——I/O 事件先到就提前回来顺便收割 timer,hrtimer 先到就准时处理到期 timer 并回到 epoll 继续等 I/O。
tokio 时间轮:管理 1,000,000 个 Sleep
↓ 算出最早 deadline = now + 5s
↓ 传给 epoll_wait 作为 timeout
epoll_wait:设 1 个内核 hrtimer,5s 后叫醒线程
↓ epoll_wait 不知道 tokio 的时间轮
↓ epoll_wait 不知道 StateCell
↓ epoll_wait 不知道 Waker
↓ 它只是一个准确的线程睡眠定时器
↓ 5s 后
epoll_wait 返回 → 回到 tokio Driver
↓
tokio process_at_time():用自己维护的 elapsed 推进时间轮
↓ 收割到期 timer
↓ fire → waker.wake()
如果把 epoll_wait(timeout) 换成 nanosleep(timeout) + epoll_wait(-1),效果等价25。Tokio 用 epoll_wait(timeout) 只是因为方便——一个系统调用同时等两件事(I/O 事件和定时器到期),比分开用 nanosleep + epoll_wait 少一次上下文切换。
所以 epoll_wait 的角色是:tokio 的”定时闹钟”,不是 tokio 的”定时器驱动”。tokio 的定时器驱动完全在用户态完成——elapsed 自维护、时间轮自推进、StateCell 自管理。epoll_wait 只是”到点把线程叫醒”的门铃。
这个角色区分是整个体系的精妙之处:内核做它最擅长的事——准确到毫秒地叫醒线程;tokio 做它最擅长的事——在海量定时器中高效找出哪些已经到期。
七、TimeSource:时间与 tick 的桥梁
TimerEntry 使用 Instant 作为对外的时间抽象,而 Wheel 内部使用 u64 毫秒级 tick。TimeSource 负责两者之间的转换26:
// tokio/src/runtime/time/source.rs: 6-38
pub(crate) struct TimeSource {
start_time: Instant,
}
impl TimeSource {
pub(crate) fn deadline_to_tick(&self, t: Instant) -> u64 {
// 向上取整到最近的毫秒
self.instant_to_tick(t + Duration::from_nanos(999_999))
}
pub(crate) fn instant_to_tick(&self, t: Instant) -> u64 {
let dur: Duration = t.saturating_duration_since(self.start_time);
let ms = dur.as_millis().try_into()
.unwrap_or(MAX_SAFE_MILLIS_DURATION);
ms.min(MAX_SAFE_MILLIS_DURATION)
}
pub(crate) fn now(&self, clock: &Clock) -> u64 {
self.instant_to_tick(clock.now())
}
}
展开来看两个转换函数。instant_to_tick 是核心26:
pub(crate) fn instant_to_tick(&self, t: Instant) -> u64 {
let dur: Duration = t.saturating_duration_since(self.start_time);
// ↑ 核心公式:t - start_time
let ms = dur.as_millis().try_into()...; // u128 → u64
ms.min(MAX_SAFE_MILLIS_DURATION)
}
deadline_to_tick 在它之上加了向上取整:
pub(crate) fn deadline_to_tick(&self, t: Instant) -> u64 {
self.instant_to_tick(t + Duration::from_nanos(999_999))
// ↑ 加 0.999999ms,任何不足 1ms 都被进位
}
关系是:Wheel.elapsed 和 StateCell.state 都来自 TimeSource 的同一个参考原点 start_time。
以 reset(deadline) 为例,tick 是通过 deadline_to_tick(deadline) 算出的:
tick = deadline_to_tick(deadline)
= instant_to_tick(deadline + 999_999ns)
= ((deadline + 999_999ns) - start_time).as_millis()
= (deadline - start_time + 999_999ns).as_millis()
如果此时 now - start_time = 5000ms,而 deadline = now + 5min:
tick = ((now + 5min + 999_999ns) - start_time).as_millis()
= ((start_time + 5000 + 300000 + 0.999999 - start_time)).as_millis()
= (300000 + 5000 + 0.999999).as_millis()
= 305,000 // as_millis() 丢纳米,等价于向下取整
elapsed 和 state 的差值就是真正的剩余时间。因为两者都是相对于 start_time 的毫秒偏移量:
state - elapsed = (deadline - start_time) - (now - start_time)
= deadline - now
= 剩余等待时间(毫秒)
三个关键数值的关系可以总结为:
| 数值 | 保存在哪 | 本质 | 用途 |
|---|---|---|---|
Wheel.elapsed |
Wheel |
坐标原点偏移量:当前 now - start_time 的毫秒值 |
推进时间轮(elapsed += delta),判定 timer 是否到期(state <= elapsed) |
registered_when |
TimerShared |
slot 定位缓存:上次插入时的 state 快照,持 Relaxed 顺序 |
算 slot 索引(slot_for)、从 slot 链表移除(remove) |
StateCell.state |
StateCell |
真实到期时间:deadline - start_time,更新需 CAS(AcqRel) |
mark_pending() 中做最终到期判断(state > not_after ?) |
registered_when 是 state 的只读缓存——两者绝大多数时候相同,唯一的例外是 extend_expiration 无锁延后了 state 但未更新 registered_when 的间隙。此时 slot 位置暂时”不准”,但 Driver 在收割时通过 mark_pending 返回的 Err 检测到这个偏差,用 sync_when() 把 state 同步回 registered_when 并重插入正确 slot。
所以决定”到没到期”的只有一行判断——state > not_after。registered_when 只是加速 slot 定位的缓存,elapsed 只是标识”现在几点”的指针。state 是唯一的 source of truth。
这就验证了第 3.2 节例子中的核心假设:elapsed=0 时,tick=300000 等价于 deadline - start_time = 300000ms。如果 elapsed=5000(已经运行了 5 秒),tick 就是 305000——state 永远比 elapsed 大一个固定差值。
一个具体的数值对比能说清”绝对 vs 相对”的区别。假设 elapsed=10000(runtime 已运行 10 秒),此时创建 sleep(500ms):
sleep(500ms).await
→ deadline = now + 500ms
→ deadline_to_tick(deadline)
= (deadline + 999_999ns - start_time).as_millis()
= (now + 500ms - start_time).as_millis()
= (10000 + 500).as_millis()
= 10500 ← 绝对毫秒偏移量
state - elapsed = 10500 - 10000 = 500 ← 差值才等于注册时长
state 是绝对毫秒偏移量,不是相对值。如果存相对值(state=500),那 state - elapsed = 500 - 10000 就会包装溢出。elapsed ≥ state 的到期判断也变成一句加法——选绝对值省了这条指令,还保持了 u64 减法的一致性。
deadline_to_tick 的 999_999ns 加法是向上取整的关键:假设用户调用了 sleep(Duration::from_micros(1)),deadline 和 start_time 只差 1μs,直接 as_millis() 得到 0,timer 立刻到期,根本不等待。加上 999_999ns 后 as_millis() 得到 1——最小等待 1ms。这正是 1ms 精度约束在代码中的直接体现。
MAX_SAFE_MILLIS_DURATION 是 u64::MAX - 1,约为 5.84 亿年,远超实际需要。
u64 溢出保护
你可能会担心 elapsed 和 state 都是 u64 毫秒值——如果运行足够久会溢出吗?
有三层保护防止溢出问题:
第一层:STATE_DEREGISTERED 和 STATE_PENDING_FIRE 占用了 u64 的顶值。StateCell.state 中 u64::MAX 被用作 STATE_DEREGISTERED(已注销),u64::MAX - 1 被用作 STATE_PENDING_FIRE(待触发)。所以有效的最大 tick 是 u64::MAX - 2。如果 TimeSource::instant_to_tick 算出来的值超过了这个范围,会被 ms.min(MAX_SAFE_MILLIS_DURATION) 钳住。
第二层:时间轮的 MAX_DURATION 限制。6 层 × 64 slot 的时间轮能表达的最大范围是 2^36 - 1 ≈ 2.18 年(MAX_DURATION)。超过这个范围的 timer 会被自动放入顶层(Level 5)的合适 slot,不会因为到期时间远大于 wheel 范围而出错——这是哈希时间轮论文描述的”wrap-around”处理12。
第三层:纯数值比较。Wheel::insert() 的 when <= self.elapsed 是直接的 u64 比较。即使 elapsed 和 state 都接近 u64::MAX,差值 state - elapsed(用于计算 park_timeout)是正确的无符号结果——因为 Rust 的 saturating_sub 保证了安全性。
实际上,elapsed 从 0 以毫秒为单位递增到溢出需要约 5.84 亿年。在那之前 runtime 早就被重启无数次了。MAX_SAFE_MILLIS_DURATION 约 1.8 × 10^14 天,也远超任何实际运行时长。
八、把整条链路串起来
现在可以把 tokio::time::sleep 的完整执行路径画出来:
tokio::time::sleep(Duration::from_secs(5))
│
├─ 创建 Sleep { entry: TimerEntry { deadline: now + 5s, registered: false } }
│
├─ 首次 poll:
│ ├─ TimerEntry::reset(deadline, true)
│ │ ├─ 初始化 TimerShared(如果还没有)
│ │ ├─ 将 deadline 转为 tick → 插入 Wheel(哈希时间轮)
│ │ └─ 设置 registered = true
│ ├─ StateCell::poll(cx.waker())
│ │ ├─ 注册 waker 到 AtomicWaker
│ │ └─ 加载 state → 还不是 STATE_DEREGISTERED
│ └─ 返回 Poll::Pending
│
├─ Runtime::park_internal → time::Driver::park_internal():
│ ├─ Wheel::next_expiration_time() → Some(5000)
│ ├─ 计算 duration = 5000ms
│ ├─ self.park_thread_timeout(rt_handle, 5000ms)
│ │ └─ self.park.park_timeout(handle, 5000ms)
│ │ └─ IoStack::park_timeout(handle, 5000ms)
│ │ └─ ProcessDriver::park_timeout(handle, 5000ms)
│ │ └─ SignalDriver::park_timeout(handle, 5000ms) [Unix, 可选]
│ │ └─ io::Driver::park_timeout(handle, 5000ms)
│ │ └─ io::Driver::turn(handle, Some(5000ms))
│ │ └─ self.poll.poll(events, Some(5000ms))
│ │ └─ mio::Poll::poll(events, 5000ms)
│ │ └─ epoll_wait(epfd, events, maxevents, 5000)
│ │ └─ 内核 hrtimer → 线程休眠 5 秒
│
├─ 5 秒后 OS 唤醒线程:
│ ├─ Wheel::poll(now) → 找到到期的 entry
│ ├─ entry.fire(Ok(()))
│ │ ├─ result ← Ok(())
│ │ ├─ state ← STATE_DEREGISTERED
│ │ └─ 取出 waker
│ └─ waker_list.wake_all() ← 把任务放回调度队列
│
└─ 任务被再次 poll:
├─ StateCell::read_state()
│ └─ cur_state == STATE_DEREGISTERED → 读取 result = Ok(())
└─ 返回 Poll::Ready(())
sequenceDiagram
participant User as User Task
participant Sleep as Sleep Future
participant Entry as TimerEntry
participant Cell as StateCell
participant Wheel as Hashed Wheel
participant Driver as Time Driver
participant OS as OS
User->>Sleep: .await → poll(cx)
Sleep->>Entry: poll_elapsed(cx)
Entry->>Wheel: reset(deadline) → insert to Wheel
Entry->>Cell: poll(waker) → register waker
Cell-->>Entry: Poll::Pending
Entry-->>Sleep: Poll::Pending
Sleep-->>User: Poll::Pending
Note over User: Worker 线程继续服务其他任务
Driver->>Wheel: next_expiration_time() → 5000ms
Driver->>OS: park_timeout(5000ms)
Note over OS: ... 5 秒 ...
OS-->>Driver: 唤醒
Driver->>Wheel: poll(now) → 到期 entry
Driver->>Cell: fire(Ok(()))
Cell-->>Driver: Some(waker)
Driver->>Driver: waker.wake()
Note over User: 任务被重新调度
User->>Sleep: poll(cx)
Sleep->>Cell: read_state()
Cell-->>Sleep: Poll::Ready(Ok(()))
Sleep-->>User: Poll::Ready(())
九、取消、重置与交互细节
Sleep 的设计还支持两个重要的交互场景:
取消:直接 drop 掉 Sleep 实例即可。TimerEntry 的 PinnedDrop 实现会调用 cancel(),将定时器从时间轮中移除4。没有额外的清理工作。
重置:可以通过 Pin<&mut Sleep>::reset(new_deadline) 来改变 Sleep 的到期时间,而无需创建新的 Future2。内部实现会先尝试原子地延后到期时间(extend_expiration),如果 CAS 成功则无需操作时间轮;如果提前了到期时间,则会重新走 reregister 路径。
这种”乐观延后”的优化利用了定时器一个常见的使用模式——超时通常在操作开始时设置,然后在操作结束前被取消。延后触发时,旧的 slot 位置依然是”安全的”(不会过早触发),所以可以用 CAS 无锁更新。
总结
tokio::time::sleep 的实现体现了异步系统设计的核心分层:
- 接口层(
sleep()/Sleep):对用户暴露一个.await即可等待的 Future - 运行时层(
TimerEntry/TimerShared/StateCell):用一个原子状态机管理定时器的生命周期,协调用户端和驱动端的并发访问 - 数据结构层(
Wheel):用 6 层哈希时间轮实现 O(1) 插入和高效的到期检测 - 驱动层(
Driver::park_internal):通过操作系统的定时唤醒(park_timeout)驱动整个机制运转
最终的效果是:一个单线程可以同时管理成千上万个定时器,线程只会在最近的 deadline 时休眠,期间 CPU 可以服务其他任务或进入省电状态。tokio::time::sleep 的高效并发能力不是来自魔法,而是来自一条从 Future 到 OS 的精心设计的链路。
References
-
之前的文章:《Rust async/await 的底层契约:从 Future::poll 到 Tokio 运行时》,2026-04-30。详细阐述了
Future::poll、Context、Waker组成的核心协议。 ↩ -
Tokio 源码,
Sleep结构定义,tokio/src/time/sleep.rs。包含Sleep的完整 Future 实现,sleep()和sleep_until()的构造逻辑。 ↩ ↩2 -
Tokio 源码,
Timerenum 定义,tokio/src/runtime/mod.rs。Timer是Traditional(TimerEntry)和Alternative(time_alt::Timer)的枚举封装。 ↩ -
Tokio 源码,
TimerEntry结构定义,tokio/src/runtime/time/entry.rs。包含TimerEntry的关键方法:new、reset、poll_elapsed、cancel。 ↩ ↩2 -
Tokio 源码,
TimerShared结构定义,tokio/src/runtime/time/entry.rs。前端与驱动端共享的状态,包含侵入式链表指针、registered_when和StateCell。 ↩ -
Tokio 源码,
TimerEntry::poll_elapsed方法,tokio/src/runtime/time/entry.rs。首次 poll 时注册到时间轮,后续 poll 时检查状态。 ↩ -
Tokio 源码,
TimerEntry::reset方法,tokio/src/runtime/time/entry.rs。reset()先尝试extend_expiration乐观路径,失败后通过inner.into()将&TimerShared转为NonNull<TimerShared>并调用Handle::reregister()。 ↩ -
Tokio 源码,
Handle::reregister方法,tokio/src/runtime/time/mod.rs。接收NonNull<TimerShared>,通过entry.as_ref().handle()创建TimerHandle,调用lock.wheel.insert(entry)重新插入时间轮。 ↩ -
Tokio 源码,
TimerHandle结构定义和TimerShared::handle()方法,tokio/src/runtime/time/entry.rs。TimerHandle是NonNull<TimerShared>的包装,不拥有数据,由 unsafe 契约确保安全使用。 ↩ -
Tokio 源码,
Wheel::insert和Level::add_entry,tokio/src/runtime/time/wheel/mod.rs以及tokio/src/runtime/time/wheel/level.rs。Wheel::insert()计算层级后调用Level::add_entry(),后者将TimerHandlepush 到对应 slot 的侵入式链表中。 ↩ -
Tokio 文档,
sleep()的 panic 说明,tokio/src/time/sleep.rs。解释了为什么rt.block_on(sleep(...))会 panic 而rt.block_on(async {sleep(...).await})不会。 ↩ -
George Varghese 和 Tony Lauck,《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility》,1996。论文链接:Hashed and Hierarchical Timing Wheels。Tokio 的哈希时间轮数据结构直接引用了此论文的设计。 ↩ ↩2 ↩3
-
Tokio 源码,
Wheel结构定义与文档,tokio/src/runtime/time/wheel/mod.rs。6 层、每层 64 slot 的哈希时间轮实现,以及NUM_LEVELS和MAX_DURATION常量。 ↩ ↩2 ↩3 -
Tokio 源码,
Level和EntryList定义,tokio/src/runtime/time/wheel/level.rs。Level包含occupied位图和slot: [EntryList; 64];EntryList是LinkedList<TimerShared, TimerShared>,定义在tokio/src/runtime/time/entry.rs。TimerShared 通过侵入式链表的pointers字段直接链接在 slot 的链表中。 ↩ -
Tokio 源码,
StateCell结构及其方法,tokio/src/runtime/time/entry.rs。包含poll、mark_pending(CAS 循环)、fire、set_expiration、extend_expiration等原子操作。 ↩ ↩2 ↩3 ↩4 ↩5 -
Tokio 源码,
Driver::park_internal和Handle::process_at_time,tokio/src/runtime/time/mod.rs。运行时驱动定时器的核心逻辑:计算 next_wake、带超时 park、处理到期定时器、批量唤醒。 ↩ ↩2 ↩3 ↩4 ↩5 -
关于 per-timer StateCell 的权衡分析:每个 timer 独立持有
AtomicU64,Driver 遍历 slot 时逐 timer CAS。对比替代方案(per-slot Mutex),CAS 方案在零锁争用和 O(N) 线性代价上占据优势,且 slot 的 1ms 宽度提供了天然的保护层,使遍历时间(~10ns/timer)不会影响精度。此分析基于 x86-64 上lock cmpxchgq指令的理论延迟约 10-20ns(无 cache miss 时),以及 level-0 slot 宽度 1ms 的事实。 ↩ -
Tokio 源码,
runtime::driver::Driver::new和Handle结构定义,tokio/src/runtime/driver.rs以及tokio/src/runtime/driver.rs。Time Driver 和 I/O Driver 分别独立初始化,通过create_io_stack和create_time_driver堆叠。Handle 包含io、time、signal三个子句柄,通过Arc分发给所有 worker 线程。 ↩ -
Tokio 源码,
current_thread调度器的Core结构,tokio/src/runtime/scheduler/current_thread/mod.rs。Driver 以Option<Driver>形式存储,park 时通过take()取出。 ↩ -
Tokio 源码,
multi_thread调度器的Shared结构和Parker::park_timeout方法,tokio/src/runtime/scheduler/multi_thread/park.rs(Shared)及tokio/src/runtime/scheduler/multi_thread/park.rs(park_timeout)。TryLock<Driver>保证同一时刻只有一个 worker 能持有 Driver,但其他 worker 仍可无锁执行 task。 ↩ -
Linux 内核源码,
ep_poll中schedule_hrtimeout_range调用,fs/eventpoll.c。epoll_wait的timeout_ms参数经ep_timeout_to_timespec()转为timespec64绝对时间后,由schedule_hrtimeout_range()注册到 hrtimer 红黑树,实现纳秒精度的超时唤醒。 ↩ -
Linux 内核源码,
hrtimer_clock_base结构定义,include/linux/hrtimer_defs.h。hrtimer 使用红黑树(timerqueue_linked_head)而非哈希时间轮来组织定时器。 ↩ -
Linux 内核源码,
enqueue_hrtimer、hrtimer_wakeup、hrtimer_interrupt,kernel/time/hrtimer.c。enqueue_hrtimer()(行 1096-1104)将定时器插入红黑树;hrtimer_wakeup()(行 2184-2193)在定时器触发时唤醒进程;hrtimer_interrupt()(行 2083-2114)是时钟中断的处理入口。 ↩ ↩2 ↩3 ↩4 -
Linux 内核源码,timer wheel 实现,
kernel/time/timer.c。行 64-150 的注释详细描述了低精度定时器的层级哈希轮设计:8-9 层、每层 64 bucket(LVL_SIZE=64,LVL_DEPTH=8/9),与 Tokio 的实现思想同源。 ↩ -
epoll_wait(timeout)在本场景中等价于nanosleep(timeout)+epoll_wait(-1)——都是”先睡 timeout 时间,醒来后再 epoll 检查 I/O 事件”。Tokio 选前者只是因为一个系统调用比两个高效,功能上没有区别。 ↩ ↩2 -
Tokio 源码,
TimeSource实现,tokio/src/runtime/time/source.rs。负责Instant和u64tick 之间的转换,向上取整到毫秒精度。 ↩ ↩2
系列:Tokio 异步运行时
- Rust async/await 的底层契约:从 Future::poll 到 Tokio 运行时
- 从 tokio::time::sleep 看异步 Timer 的实现:一次从 Future::poll 到哈希时间轮的源码之旅
- 从 O(n) 到 O(1):经典定时器论文《Hashed and Hierarchical Timing Wheels》的七种设计
- 从 tokio::time::sleep 到 tokio::net::TcpStream:时间驱动与事件驱动的两种异步实现
- Linux hrtimer 纳秒精度从何而来:从 TSC 到红黑树的完整链路