Dawn's Blogs

分享技术 记录成长

0%

GO专家编程读书笔记 (6) 定时器之Timer和Ticker

Timer

Timer 指定一段时间后,通过本身提供的一个 channel 通知触发一个事件,Timer 只执行一次就结束。

数据结构

Timer

Timer 有两个成员:

  • C:一个 channel,用于通知时间已经到达。
  • r:runtime 定时器,该定时器即系统管理的定时器,对上层应用不可见。
1
2
3
4
type Timer struct { // Timer代表一次定时,时间到来后仅发生一个事件。
C <-chan Time
r runtimeTimer
}

runtimeTimer

创建一个 Timer 实质上是把一个定时任务交给专门的协程进行监控,这个任务的载体便是 runtimeTimer。创建一个 Timer 就是创建一个 runtimeTimer,把它交给系统进行监控,当 runtimeTimer 到期后像 Timer.C 管道中发送一个消息。

runtimeTimer 的结构如下:

1
2
3
4
5
6
7
8
9
10
type runtimeTimer struct {
tb uintptr // 存储当前定时器的数组地址
i int // 存储当前定时器的数组下标

when int64 // 当前定时器触发时间
period int64 // 当前定时器周期触发间隔
f func(interface{}, uintptr) // 定时器触发时执行的函数
arg interface{} // 定时器触发时执行函数传递的参数一
seq uintptr // 定时器触发时执行函数传递的参数二(该参数只在网络收发场景下使用)
}

实现原理

所有 Timer 中的 runtimeTimer 由统一的底层协程进行管理,这个协程是系统协程。

系统协程把 runtimeTimer 存放在数组!!中,并按照 runtimeTimer.when 进行堆排序,定时器触发时执行预定义的函数 runtimeTimer.f,即完成了一次定时任务。

创建 Timer

创建 Timer 的过程:

  • 首先会初始化一个管道 C,用于通知上层应用。
  • 接着会创建一个 runtimeTimer,并且调用 startTimer 启动定时器(由系统协程维护)。
1
2
3
4
5
6
7
8
9
10
11
12
13
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1) // 创建一个管道
t := &Timer{ // 构造Timer数据结构
C: c, // 新创建的管道
r: runtimeTimer{
when: when(d), // 触发时间
f: sendTime, // 触发后执行函数sendTime
arg: c, // 触发后执行函数sendTime时附带的参数
},
}
startTimer(&t.r) // 此处启动定时器,只是把runtimeTimer放到系统协程的堆中,由系统协程维护
return t
}

when 方法用于计算下一次定时器触发的绝对时间,即当前时间 + d。

sendTime 方法用于定时器触发时,向管道 C 中发送当前时间:

1
2
3
4
5
6
func sendTime(c interface{}, seq uintptr) {
select {
case c.(chan Time) <- Now():
default:
}
}

因为 Timer 创建时,初始化了一个缓冲区长度为 1 的管道(make(chan Time, 1)),所以 Timer 触发时向管道写入时间永远不会阻塞,sendTime 写完即退出

之所以 sendTime 使用 select 并搭配一个空的 default 分支,是因为 Ticker 也复用 sendTime。Ticker 触发时也会向管道中写入时间,但无法保证之前的数据已被取走,所以使用 select 并搭配一个空的 default 分支,确保 sendTime 不会阻塞。Ticker 触发时,如果管道中还有值,则本次不再向管道中写入时间,本次触发的事件直接丢弃。

startTimer 函数的主要作用就是将 runtimeTimer 写入到系统协程的数组中,并启动系统协程(如果系统协程还未开始运行的话)。

img

停止 Timer

停止 Timer 只需要把 runtimeTimer 从系统协程的数组中移除即可:

1
2
3
func (t *Timer) Stop() bool {
return stopTimer(&t.r)
}

系统协程只是移除 runtimeTimer 并不会关闭管道,以避免用户协程读取错误。

Timer 已经触发返回 true;Timer 没有触发返回 false。

img

重置 Timer

重置 Timer 时会先把 runtimeTimer 冲系统协程中删除,然后修改触发时间后再添加到系统协程的数组中。

1
2
3
4
5
6
7
func (t *Timer) Reset(d Duration) bool {
w := when(d)
active := stopTimer(&t.r)
t.r.when = w
startTimer(&t.r)
return active
}

img

Ticker

不使用的 Ticker 需要显式地 Stop,否则会产生资源泄露。

因为系统协程会一致定期的触发事件。

数据结构

Ticker 的数据结构和 Timer 完全一致:

1
2
3
4
type Ticker struct {
C <-chan Time // The channel on which the ticks are delivered.
r runtimeTimer
}

实现原理

创建 Ticker

创建 Ticker 与创建 Timer 的过程差不多,其步骤都是:首先会初始化一个管道 C,用于通知上层应用。接着会创建一个 runtimeTimer,并且调用 startTimer 启动定时器(由系统协程维护)。

但是与 Timer 不同的是,Ticker 在创建 runtimeTimer 时提供了 period 参数,据此决定 runtimeTimer 是一次性的,还是周期性的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func NewTicker(d Duration) *Ticker {
if d <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
// Give the channel a 1-element time buffer.
// If the client falls behind while reading, we drop ticks
// on the floor until the client catches up.
c := make(chan Time, 1)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d),
period: int64(d), // Ticker跟Timer的重要区就是提供了period这个参数,据此决定timer是一次性的,还是周期性的
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}

Ticker.C 是一个缓冲区长度为 1 的管道,Ticker 触发的事件是周期性的,如果管道中的数据没有被取走,那么 sendTime 也不会阻塞,而是直接退出,带来的后果是本次事件会丢失。

停止 Ticker 与 Timer 一样,这里不做过多赘述。

系统协程

因为 Timer 和 Ticker 创建好一个 runtimeTimer 后,统一交给系统协程进行管理,下文将说明系统协程如何管理这些定时器的。

定时器存储

在 time 包中,系统协程维护的对象名称为 runtimeTimer,而在 runtime 包中对应的是 timer

数据结构

timer 的数据结构如下,其中,timer.tb(timersBucket)是系统协程存储 timer 的容器,里面有个切片来存储 timer,timer.i 是当前所在的下标

1
2
3
4
5
6
7
8
9
10
type timer struct {
tb *timersBucket // the bucket the timer lives in // 当前定时器寄存于系统timer堆的地址
i int // heap index // 当前定时器寄存于系统timer堆的下标

when int64 // 当前定时器下次触发时间
period int64 // 当前定时器周期触发间隔(如果是Timer,间隔为0,表示不重复触发)
f func(interface{}, uintptr) // 定时器触发时执行的函数
arg interface{} // 定时器触发时执行函数传递的参数一
seq uintptr // 定时器触发时执行函数传递的参数二(该参数只在网络收发场景下使用)
}

而 timersBucket 的数据结构如下:

  • lock:互斥锁,在 timer 增加和删除时需要使用;
  • gp:事件处理协程,就是系统协程,这个协程在首次创建 Timer 或 Ticker 时生成;
  • created:状态值,表示系统协程是否创建;
  • sleeping:系统协程是否在睡眠;
  • rescheduling:系统协程是否已暂停;
  • sleepUntil:系统协程睡眠到指定的时间(如果有新的定时任务可能会提前唤醒);
  • waitnote:提前唤醒时使用的通知;
  • t:保存 timer 的切片,当调用 NewTimer 或 NewTicker 时便会有新的 timer 存到此切片中。
1
2
3
4
5
6
7
8
9
10
type timersBucket struct {
lock mutex
gp *g // 处理堆中事件的协程
created bool // 事件处理协程是否已创建,默认为false,添加首个定时器时置为true
sleeping bool // 事件处理协程(gp)是否在睡眠(如果t中有定时器,还未到触发的时间,那么gp会投入睡眠)
rescheduling bool // 事件处理协程(gp)是否已暂停(如果t中定时器均已删除,那么gp会暂停)
sleepUntil int64 // 事件处理协程睡眠时间
waitnote note // 事件处理协程睡眠事件(据此唤醒协程)
t []*timer // 定时器切片
}

假如创建了三个 Ticker,则 Ticker、timer 和 timersBucket 的存储示意图如下:

img

timersBucket 数组

通过 timersBucket 数据结构可以看到,系统协程负责计时并维护其中的多个 timer,一个 timersBucket 包含一个系统协程

当系统中定时器非常多时,一个系统协程可能处理能力跟不上,所以 Go 在实现时实际上提供了多个 timersBucket,也就有多个系统协程来处理定时器。

Go 预留了 64 个 timersBucket,每当协程创建定时器时,使用协程所属的 ProcessID%64 来计算定时器存入的 timersBucket

定时器运行机制

创建定时器

在创建一个 Timer 或者 Ticker 时,其步骤都是相同的:首先创建一个管道;接着创建一个 runtimeTimer(timer),并启动。

在第二步创建一个 timer 的过程中,首先要进行的是选择一个 timersBucket,就是当前协程所属的 Processor ID 与 timersBucket 取模得到下标:

1
2
3
4
5
6
7
8
9
const timersLen = 64
var timers [timersLen]struct { // timersBucket数组,长度为64
timersBucket
}
func (t *timer) assignBucket() *timersBucket {
id := uint8(getg().m.p.ptr().id) % timersLen // Processor ID 与数组长度求模,得到下标
t.tb = &timers[id].timersBucket
return t.tb
}

接着就是将 timer 加入到 timersBucket 中,而 timer 在 timersBucket 中是以小根堆的形式维护的:

  • 如果 timer 的时间是负值,那么会被修改为很大的值,来保证后续定时算法的正确性。
  • 先把定时器插入到小根堆的堆尾,并且重新调整堆
  • 新加入 timer 后,如果新的 timer 跑到了栈顶,需要唤醒系统协程来处理。
  • 如果是当前 timersBucket 的首个 timer,则启动利用 go timerproc(tb) 启动一个系统协程来处理堆中的定时器。

Go 使用的是四叉堆而不是二叉堆,好处是堆的维度降低,调整堆更快。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (tb *timersBucket) addtimerLocked(t *timer) bool {
if t.when < 0 {
t.when = 1<<63 - 1
}
t.i = len(tb.t) // 先把定时器插入到堆尾
tb.t = append(tb.t, t) // 保存定时器
if !siftupTimer(tb.t, t.i) { // 堆中插入数据,触发堆重新排序
return false
}
if t.i == 0 { // 堆排序后,发现新插入的定时器跑到了栈顶,需要唤醒协程来处理
// siftup moved to top: new earliest deadline.
if tb.sleeping { // 协程在睡眠,唤醒协程来处理新加入的定时器
tb.sleeping = false
notewakeup(&tb.waitnote)
}
if tb.rescheduling { // 协程已暂停,唤醒协程来处理新加入的定时器
tb.rescheduling = false
goready(tb.gp, 0)
}
}
if !tb.created { // 如果是系统首个定时器,则启动协程处理堆中的定时器
tb.created = true
go timerproc(tb)
}
return true
}

删除定时器

删除 timer 就是从 timersBuckets 的小根堆中删除 timer,然后调整小根堆。

timerproc

timerproc 为系统协程的具体实现。它是在首次创建定时器创建并启动的,一旦启动永不销毁

  • 如果 timersBucket 中有定时器,取出堆顶定时器,计算睡眠时间,然后进入睡眠,醒来后触发事件

  • 某个 timer 的事件触发后,根据其是否是周期性定时器来决定将其删除还是修改时间后重新加入堆

  • 如果堆中已没有事件需要触发,则系统协程将进入暂停态,也可认为是无限时睡眠,直到有新的 timer 加入才会被唤醒