本项目地址:LMQ
磁盘队列 每一个 topic 都会维护一个磁盘消息队列,每一个 channel 也会维护一个磁盘消息队列。当内存消息队列满了之后,新的消息就会存储在磁盘消息队列中。
在 lmqd 退出时消息的持久化 也是依靠磁盘消息队列实现(将内存消息队列中的消息全部推入磁盘消息队列中,进行持久化存储)。
元数据 一个磁盘消息队列会维护多个文件,以 index 标识顺序。一个磁盘队列的核心元数据就是记录四个变量(元数据也会被持久化,以便标识下一次启动时的读取和写入位置):
当前读取的文件号,当前读取的文件位置。
当前写入的文件号,当前写入的文件位置。
ioLoop 磁盘消息队列会开启一个协程 ioLoop,用于:
监听一个用于写入的 chan ,并且写入的结果会被送入响应 chan 。
尝试读取 一个消息,将消息送入读取 chan ,作为 topic 或者 channel 的输入数据源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 func (queue *DiskBackendQueue) ioLoop () { var err error var count int64 var readChan chan []byte var dataRead []byte syncTicker := time.NewTicker(queue.syncTimeout) for { if count == queue.syncEvery { queue.needSync = true } if queue.needSync { err = queue.sync() if err != nil { logger.Errorf("DiskQueue(%s) failed to sync - %v" , queue.name, err) } count = 0 } if (queue.readFileIndex < queue.writeFileIndex) || (queue.readFilePos < queue.writeFilePos) { if queue.nextReadPos == queue.readFilePos { dataRead, err = queue.readOne() if err != nil { logger.Errorf("DiskQueue(%s) reading at %d of %s - %s" , queue.name, queue.readFilePos, queue.fileName(queue.readFileIndex), err.Error()) queue.handleReadError() continue } } readChan = queue.readChan } else { readChan = nil } select { case data := <-queue.writeChan: count++ queue.writeResponseChan <- queue.writeOne(data) case readChan <- dataRead: count++ queue.moveForward() case <-queue.emptyChan: queue.emptyResponseChan <- queue.deleteAllFiles() count = 0 case <-syncTicker.C: if count == 0 { continue } queue.needSync = true case <-queue.exitChan: goto exit } } exit: logger.Infof("DiskQueue(%s) ioLoop closing" , queue.name) syncTicker.Stop() queue.exitSyncChan <- struct {}{} }
超时优先队列 超时优先队列实际上是一个小根堆 (实现了 heap.Interface 接口),以超时时间作为优先 级。那么最早超时的消息就是堆顶元素 。
每次计时器时间到达,检查是否有消息超时时,都会拿出堆顶消息和当前时间进行比较 ,如果堆顶消息的超时时间小于当前时间,则说明堆顶的消息已经超时,弹出堆顶消息进行超时处理。循环判断,直到堆顶不满足超时条件,此时已经没有超时消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func (channel *Channel) processInFlightQueue () { channel.exitLock.RLock() channel.exitLock.RUnlock() if channel.isExiting.Load() { return } now := time.Now().UnixNano() for { channel.inFlightMessagesLock.Lock() msg := channel.inFlightMessagesPriQueue.PeekAndShift(now) channel.inFlightMessagesLock.Unlock() if msg == nil { return } _, err := channel.popInFlightMessage(msg.GetClientID(), msg.GetID()) if err != nil { return } channel.timeoutCount.Add(1 ) channel.RLock() client, ok := channel.clients[msg.GetClientID()] if ok { client.TimeoutMessage() } channel.RUnlock() logger.Infof("message id = %v timeout, now requeue" , msg.GetID()) _ = channel.put(msg) } }