Dawn's Blogs

分享技术 记录成长

0%

LMQ实现 (4) 磁盘队列和超时队列

本项目地址:LMQ

磁盘队列

每一个 topic 都会维护一个磁盘消息队列,每一个 channel 也会维护一个磁盘消息队列。当内存消息队列满了之后,新的消息就会存储在磁盘消息队列中。

在 lmqd 退出时消息的持久化也是依靠磁盘消息队列实现(将内存消息队列中的消息全部推入磁盘消息队列中,进行持久化存储)。

元数据

一个磁盘消息队列会维护多个文件,以 index 标识顺序。一个磁盘队列的核心元数据就是记录四个变量(元数据也会被持久化,以便标识下一次启动时的读取和写入位置):

  • 当前读取的文件号,当前读取的文件位置。
  • 当前写入的文件号,当前写入的文件位置。

ioLoop

磁盘消息队列会开启一个协程 ioLoop,用于:

  • 监听一个用于写入的 chan,并且写入的结果会被送入响应 chan
  • 尝试读取一个消息,将消息送入读取 chan,作为 topic 或者 channel 的输入数据源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// disk queue的核心函数,用于读写
func (queue *DiskBackendQueue) ioLoop() {
var err error
var count int64
var readChan chan []byte
var dataRead []byte

syncTicker := time.NewTicker(queue.syncTimeout)

for {
if count == queue.syncEvery {
queue.needSync = true
}

// 需要fsync
if queue.needSync {
err = queue.sync()
if err != nil {
logger.Errorf("DiskQueue(%s) failed to sync - %v", queue.name, err)
}
count = 0
}

// 有消息还未读取,那么就读取一个消息
if (queue.readFileIndex < queue.writeFileIndex) || (queue.readFilePos < queue.writeFilePos) {
if queue.nextReadPos == queue.readFilePos {
dataRead, err = queue.readOne()
if err != nil {
logger.Errorf("DiskQueue(%s) reading at %d of %s - %s", queue.name, queue.readFilePos, queue.fileName(queue.readFileIndex), err.Error())
queue.handleReadError() // 发生读取错误就取消当前文件的写入和读取,转到下一个文件写入。
continue
}
}
readChan = queue.readChan
} else {
readChan = nil
}

select {
case data := <-queue.writeChan: // 有写入请求
count++
queue.writeResponseChan <- queue.writeOne(data)
case readChan <- dataRead: // 有读取请求
count++
queue.moveForward()
case <-queue.emptyChan: // 有清空请求
queue.emptyResponseChan <- queue.deleteAllFiles()
count = 0
case <-syncTicker.C:
if count == 0 {
// 期间没有进行读写操作,跳过同步
continue
}
queue.needSync = true
case <-queue.exitChan:
goto exit
}
}

exit:
logger.Infof("DiskQueue(%s) ioLoop closing", queue.name)
syncTicker.Stop()
queue.exitSyncChan <- struct{}{}
}

超时优先队列

超时优先队列实际上是一个小根堆(实现了 heap.Interface 接口),以超时时间作为优先级。那么最早超时的消息就是堆顶元素

每次计时器时间到达,检查是否有消息超时时,都会拿出堆顶消息和当前时间进行比较,如果堆顶消息的超时时间小于当前时间,则说明堆顶的消息已经超时,弹出堆顶消息进行超时处理。循环判断,直到堆顶不满足超时条件,此时已经没有超时消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (channel *Channel) processInFlightQueue() {
channel.exitLock.RLock()
channel.exitLock.RUnlock()

if channel.isExiting.Load() {
return
}

now := time.Now().UnixNano()
for {
channel.inFlightMessagesLock.Lock()
msg := channel.inFlightMessagesPriQueue.PeekAndShift(now)
channel.inFlightMessagesLock.Unlock()

if msg == nil {
return
}

_, err := channel.popInFlightMessage(msg.GetClientID(), msg.GetID())
if err != nil {
return
}

channel.timeoutCount.Add(1)

channel.RLock()
client, ok := channel.clients[msg.GetClientID()]
if ok {
client.TimeoutMessage()
}
channel.RUnlock()

logger.Infof("message id = %v timeout, now requeue", msg.GetID())
_ = channel.put(msg)
}
}