本项目地址:LMQ
lmqd 中的元数据是指 lmqd 中的 Topic 和 Channel 信息
Topic Topic 负责接受生产者的消息,并且将消息复制分发给每一个 channel 中。
数据结构 Topic 的结构如下,主要的属性有:
name:topic 的名称。
isTemporary:是否是临时的 Topic,如果名称带有 #tmp
则表明这是一个临时的 Topic。
对于临时 Topic,backendQueue(磁盘队列)为 nil,所以超出内存队列长度的数据会被直接丢弃。
若 channel 为空,则会立即删除这个 topic。
一个临时的 topic 不会持久化到磁盘中。
channels:topic 下有多个 channel,保存所有在此 topic 下的 channel。
memoryMsgChan:内存消息队列。
backendMsgChan:磁盘消息队列。
deleteCallback:这个 topic 删除后的回调函数,主要用于删除 lmqd 中维护的 topic 信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 type Topic struct { lmqd iface.ILmqDaemon name string isTemporary bool isPausing atomic.Bool isExiting atomic.Bool channels map [string ]iface.IChannel channelsLock sync.RWMutex guidFactory *GUIDFactory memoryMsgChan chan iface.IMessage backendQueue backendqueue.BackendQueue deleteCallback func (topic iface.ITopic) deleter sync.Once startChan chan struct {} updateChan chan struct {} pauseChan chan struct {} closingChan chan struct {} closedChan chan struct {} messageCount atomic.Uint64 messageBytes atomic.Uint64 }
生产消息 通过 select,在内存队列中放入消息。如果内存队列已满,则会进入 default 分支,将消息送入磁盘消息队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (topic *Topic) PutMessage (msg iface.IMessage) error { topic.channelsLock.RLock() defer topic.channelsLock.RUnlock() if topic.isExiting.Load() { return e.ErrTopicIsExiting } if msg.GetDataLength() < config.GlobalLmqdConfig.MinMessageSize || msg.GetDataLength() > config.GlobalLmqdConfig.MaxMessageSize { return e.ErrMessageLengthInvalid } select { case topic.memoryMsgChan <- msg: default : data, err := message.ConvertMessageToBytes(msg) if err != nil { logger.Errorf("topic(%s) convert message to bytes err when PutMessage: %s" , topic.name, err.Error()) return err } err = topic.backendQueue.Put(data) if err != nil { logger.Errorf("topic(%s) convert message to bytes err when put msg into backend queue: %s" , topic.name, err.Error()) return err } } topic.messageCount.Add(1 ) topic.messageBytes.Add(uint64 (len (msg.GetData()))) return nil }
消息分发 Topic 在开启时会启动一个线程专门用于监听消息分发和状态更新。Topic 的消息的分发由 messagePump 方法实现,负责复制消息到每一个 Channel 上。
messagePump 方法监听着以下几个 chan:
memoryMsgChan: 内存消息队列。
backendMsgChan: 磁盘消息队列,从磁盘中获取消息。
topic.updateChan:Channel 更新队列,用于更新分发数据的 Channel。
topic.pauseChan:提醒暂停或者恢复 Topic 的队列。
topic.closingChan:监听推出信号。
当从内存消息队列或者磁盘消息队列中读取到消息,则复制分发给所有的 Channel 上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 func (topic *Topic) messagePump () { var memoryMsgChan chan iface.IMessage var backendMsgChan <-chan []byte var msg iface.IMessage var channels []iface.IChannel for { select { case <-topic.pauseChan: continue case <-topic.updateChan: continue case <-topic.closingChan: goto Exit case <-topic.startChan: } break } logger.Infof("topic [%] is running" , topic.name) topic.channelsLock.RLock() for _, channel := range topic.channels { channels = append (channels, channel) } topic.channelsLock.RUnlock() if len (channels) > 0 && !topic.isPausing.Load() { memoryMsgChan = topic.memoryMsgChan backendMsgChan = topic.backendQueue.ReadChan() } for { select { case msg = <-memoryMsgChan: case data := <-backendMsgChan: var err error msg, err = message.ConvertBytesToMessage(data) if err != nil { logger.Errorf("topic(%s) convert bytes to message failed when message pump, err:%s" , topic.name, err.Error()) continue } case <-topic.updateChan: channels = channels[:0 ] topic.channelsLock.RLock() for _, channel := range topic.channels { channels = append (channels, channel) } topic.channelsLock.RUnlock() if len (channels) == 0 || topic.isPausing.Load() { memoryMsgChan = nil } else { memoryMsgChan = topic.memoryMsgChan } continue case <-topic.closingChan: goto Exit case <-topic.pauseChan: if !topic.IsPausing() { memoryMsgChan = topic.memoryMsgChan backendMsgChan = topic.backendQueue.ReadChan() } else { memoryMsgChan = nil backendMsgChan = nil } continue } logger.Infof("topic(%s) is publishing a message" , topic.name) for i, channel := range channels { var chanMsg iface.IMessage if i > 0 { chanMsg = message.NewMessage(msg.GetID(), msg.GetData()) } else { chanMsg = msg } _ = channel.PutMessage(chanMsg) } } Exit: topic.closedChan <- struct {}{} logger.Infof("topic [%s] is exited" , topic.name) }
Channel Channel 在收到 Topic 分发的消息后,会随机选择一个客户端进行发送,以此实现负载均衡。同样的,Channel 也维护着一个内存消息队列和磁盘消息队列。
此外,Channel 还维护着一个优先队列,用于处理客户端超时的消息。
数据结构 Channel 的结构如下,以下属性需要额外说明:
clients:维护着所有订阅此 channel 的客户端。
inFlightMessagesPriQueue:维护着一个优先队列 ,按照客户端超时时间进行排序 ,定期检查优先队列,检查这条消息是否客户端超时。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 type Channel struct { sync.RWMutex lmqd iface.ILmqDaemon topicName string name string isTemporary bool isExiting atomic.Bool exitLock sync.RWMutex isPausing atomic.Bool memoryMsgChan chan iface.IMessage backendQueue backendqueue.BackendQueue deleteCallback func (topic iface.IChannel) deleter sync.Once clients map [uint64 ]iface.IConsumer inFlightMessages map [iface.MessageID]iface.IMessage inFlightMessagesPriQueue *inFlightPriQueue inFlightMessagesLock sync.Mutex messageCount atomic.Uint64 requeueCount atomic.Uint64 timeoutCount atomic.Uint64 }
处理超时消息 因为 Channel 具备处理客户端超时消息的功能,所以在创建 Channel 时,会启动一个协程 去检查客户端超时消息。
Channel 维护着一个优先队列 ,按照客户端超时时间进行排序 ,定期检查优先队列,检查这条消息是否客户端超时。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (channel *Channel) queueScanWorker () { ticker := time.NewTicker(config.GlobalLmqdConfig.ScanQueueInterval) for { select { case <-ticker.C: go channel.processInFlightQueue() } if channel.isExiting.Load() { ticker.Stop() return } } }
processInFlightQueue 在 processInFlightQueue 方法中,用于检查客户端超时消息。
因为优先队列是按照超时时间维护的一个小根堆 ,所以堆顶元素存储的是可能最先超时的消息 。
每一次取小于当前时间的堆顶元素 ,如果有则说明该条消息超时,进行客户端消息超时处理 (客户端维护的 in-flight 消息数量-1)。同时,对于客户端超时的消息看作是发送失败的消息,重新进行排队 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func (channel *Channel) processInFlightQueue () { channel.exitLock.RLock() channel.exitLock.RUnlock() if channel.isExiting.Load() { return } now := time.Now().UnixNano() for { channel.inFlightMessagesLock.Lock() msg := channel.inFlightMessagesPriQueue.PeekAndShift(now) channel.inFlightMessagesLock.Unlock() if msg == nil { return } _, err := channel.popInFlightMessage(msg.GetClientID(), msg.GetID()) if err != nil { return } channel.timeoutCount.Add(1 ) channel.RLock() client, ok := channel.clients[msg.GetClientID()] if ok { client.TimeoutMessage() } channel.RUnlock() logger.Infof("message id = %v timeout, now requeue" , msg.GetID()) _ = channel.put(msg) } }