Dawn's Blogs

分享技术 记录成长

0%

godis源码阅读 (7) 时间轮

对于带有 ttl 的 key,到期清理有两种解决思路:

  • 将所有带有 ttl 的 key 记录下来,比如用一个 list 保存,启动一个协程定期的去轮询。但是这样有一个缺点,就是效率低下:因为每一个的轮询都会遍历所有的 list 项,才能知道是否到期了。
  • 时间轮,时间轮避免了每一次轮询所有的 list 项,每一次只会查询可能到期的 key

时间轮

简单时间轮

时间轮实际上是一个环形队列,底层用数组实现。数组中的每个元素可以存放一个定时任务列表。定时任务列表是一个双向链表,链表中的每一项表示的都是定时任务项,其中封装了真正的定时任务。

环形队列的每一个元素,可以看作是一个时间格,每个时间格代表当前时间轮的基本时间跨度。时间格的个数是固定的,时间轮的总体事件跨度 = 时间格个数 × 时间格的事件跨度

时间轮还有一个表盘指针,用来表示时间轮当前所处的时间(就是当前指向了哪一个时间格)。表盘指针指向的是到期的时间格,表示需要处理的时间格所对应的链表中的所有任务。

如下图所示,时间格个数为 10,基本时间跨度为 1s 的时间轮,每一格里面放的是一个定时任务链表,链表里面存有真正的任务项。

taskList

初始情况下,表盘指针指向 0。若此时有一个 2s 的任务插入进来,就会放到时间格为 2 的任务链表中。当表盘指针指向 2 时,就会执行其中的任务。

timewheel

在这样的简单时间轮中,若有一个 15s 的定时任务,那么至少需要设置一个总体时间跨度为 15s 的时间轮才够用。如果需要一个一万秒的时间轮,那么可能需要一个很大的数组去存放(如果时间基本跨度为 1s,那么数组长度为 1 万)。不仅占用很大的内存空间,而且也会因为需要遍历这么大的数组从而拉低效率。

因此引入了层级时间轮的概念。

层级时间轮

层级时间轮就是引入多层的时间轮。

如下图所示,是一个两层的时间轮。第二层时间轮也是由 10 个时间格组成,每一个时间格的跨度是第一层时间轮的总体时间跨度,所以第二次时间轮的总体时间跨度为 100s。

如果像向该时间轮中添加一个 15s 的任务,那么当第一层时间轮容纳不下时,进入第二层时间轮,并插入到过期时间为 [10,19] 的时间格中。

timewheellevel2

随着时间的流逝,当原本 15s 的任务还剩下 5s 的时候,这里就有一个时间轮降级的操作,此时第一层时间轮的总体时间跨度已足够,此任务被添加到第一层时间轮到期时间为5的时间格中,之后再经历 5s 后,此任务真正到期,最终执行相应的到期操作。

在实际的实现中,一种简单的实现方式是:可以为每个任务记录下走过的圈数(circle),来表示逻辑上的层级关系。

godis 中时间轮的实现

在 godis 中,时间轮的实现采用层级时间轮(为每个任务记录下此时需要走过的圈数,表示逻辑上的层级关系)

数据结构

TimeWheel 时间轮

godis 中,时间轮的主体结构 TimeWheel 如下:

  • interval:每个时间格的基本时间跨度。
  • ticker:定时器,每过 interval 的时间,就会移动到下一个时间格、并且执行任务。
  • slots:时间格。
  • timer:因为每一个 key 都可以移除 ttl 或者 改变 ttl,所以用 timer 来定位每一个任务(key)。
  • currentPos:表盘指针,指向当前的时间格。
  • addTaskChannel:添加任务采用异步的操作,先将任务加入到 channel 中。
  • removeTaskChannel:移除任务也采用异步的操作,将需要取消 ttl 的 key 加入到通道中。
1
2
3
4
5
6
7
8
9
10
11
12
13
// TimeWheel can execute job after waiting given duration
type TimeWheel struct {
interval time.Duration
ticker *time.Ticker
slots []*list.List

timer map[string]*location
currentPos int
slotNum int
addTaskChannel chan task
removeTaskChannel chan string
stopChannel chan bool
}

location 的结构如下,用于定位任务在时间格中,所在的位置:

  • slot:表示时间格的下标。
  • etask:表示时间格维护的双向链表中,任务元素的地址。
1
2
3
4
type location struct {
slot int
etask *list.Element
}

task 任务

task 的结构如下:

  • circle:表示当前表针走过的圈数,表示逻辑上的层级。每一次指向当前 task 所在的时间格时都会令 circle 减一,circle 为 0 时说明已经到达了第一层时间轮。
  • delay:延迟 delay 时间之后,执行任务。
1
2
3
4
5
6
type task struct {
delay time.Duration
circle int
key string
job func()
}

时间轮开始后,会开启一个 start 协程来维护时间轮。采用 select 的方式:

  • 定期轮询时间到之后,调用 tickHandler 处理某一个时间格上的任务。
  • 异步处理需要添加的任务、需要删除的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (tw *TimeWheel) start() {
for {
select {
case <-tw.ticker.C:
tw.tickHandler()
case task := <-tw.addTaskChannel:
tw.addTask(&task)
case key := <-tw.removeTaskChannel:
tw.removeTask(key)
case <-tw.stopChannel:
tw.ticker.Stop()
return
}
}
}

添加任务

首先在 addTaskChannel 通道中加入一个 task,就表示添加了一个任务。之后时间轮会调用 addTask 方法,异步的将 task 从通道中转移到时间格中。

1
func (tw *TimeWheel) addTask(task *task)

步骤如下:

  • 首先计算这个任务所对应的时间格下标 pos,以及等待的圈数
1
2
pos, circle := tw.getPositionAndCircle(task.delay)
task.circle = circle
  • 将任务加到相应的时间格中。
1
2
3
4
5
e := tw.slots[pos].PushBack(task)
loc := &location{
slot: pos,
etask: e,
}
  • 如果之前存在相同的 key 则移除位置信息,记录这个任务的位置
1
2
3
4
5
6
7
if task.key != "" {
_, ok := tw.timer[task.key]
if ok {
tw.removeTask(task.key)
}
}
tw.timer[task.key] = loc

删除任务

删除任务(为一个 key 加上 ttl)和添加任务是类似的,也是异步的方式。步骤如下:

  • 首先找到这个 key 所对应的 location。
  • 接着在相应的时间格内删除任务,删除 location 的信息。
1
2
3
4
5
6
7
8
9
func (tw *TimeWheel) removeTask(key string) {
pos, ok := tw.timer[key]
if !ok {
return
}
l := tw.slots[pos.slot]
l.Remove(pos.etask)
delete(tw.timer, key)
}

处理任务

每过 interval 时间,都会调用 tickHandler 方法,指向下一个时间格,处理时间格上的任务。

1
2
3
4
5
6
7
8
9
func (tw *TimeWheel) tickHandler() {
l := tw.slots[tw.currentPos]
if tw.currentPos == tw.slotNum-1 {
tw.currentPos = 0
} else {
tw.currentPos++
}
go tw.scanAndRunTask(l)
}

scanAndRunTask 处理任务

scanAndRunTask 是真正的处理任务逻辑,它扫描时间格列表上的每一个任务,并且执行需要执行的任务。

处理逻辑如下,首先依次扫描 list 上的每一个元素:

  • 若 circle > 0,令 circle 减一,并移动到下一个元素上。
  • 否则,开启一个协程执行任务,并且在 list 上删除当前元素、删除位置信息 location。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (tw *TimeWheel) scanAndRunTask(l *list.List) {
for e := l.Front(); e != nil; {
task := e.Value.(*task)
if task.circle > 0 {
task.circle--
e = e.Next()
continue
}

go func() {
defer func() {
if err := recover(); err != nil {
logger.Error(err)
}
}()
job := task.job
job()
}()
next := e.Next()
l.Remove(e)
if task.key != "" {
delete(tw.timer, task.key)
}
e = next
}
}