本项目地址:LMQ
Lookup
LMQ Lookup 的功能就是存储 LMQ 分布式拓扑结构,为客户端提供查询服务。
拓扑结构
Lookup 维护了 lmqd,topic 和 channel 之间的拓扑结构,三者的对应关系如下。其中,lmqd 和 topic 为多对多的关系,topic 和 channel 为一对多的关系。
1
| lmqd(producer) <--- M:N ---> topic <--- 1:N ---> channel
|
存储结构
根据 lookup 维护的拓扑结构,look 主要维护了这样一个注册字典:
- 字典的键值分为两种类型,channel 和 topic。
- 字典的值是对应的所有的生产者(Lmqd)。
1 2 3 4
| type RegistrationDB struct { sync.RWMutex registrationMap map[iface.IRegistration]iface.ProducerMap }
|
所以,存储结构如下:
1 2 3 4 5 6 7
| {topic, topicName1, ""} ---> [{id1: producer1}, {id2: producer2}, ...] {topic, topicName2, ""} ---> [{id3: producer3}, {id4: producer4}, ...] ...
{channel, topicName1, channelName1} ---> [{id1: producer1}, {id2: producer2}, ...] {channel, topicName2, channelName2} ---> [{id3: producer3}, {id4: producer4}, ...] ...
|
LookupManager
在 Lmqd 中,有一个组件为 LookupManager,有两个功能:
- 维护 lmqd 与 lookup manger 的长连接;
- 同步信息,通知 lookup 更新其拓扑。
数据结构
lookup manager 的数据结构如下,其中:
- lookupPeers:维护了与每一个 lookup 的长连接。
- norifyChan:lmqd 将 topic 和 channel 的变更消息都会发送到这个 chan 中,manager 接受变更信息并通知 lookup。
1 2 3 4 5 6 7 8 9
| type Manager struct { lmqd iface.ILmqDaemon lookupPeers []*lookupPeer notifyChan chan interface{} isExiting atomic.Bool exitChan chan struct{}
utils.WaitGroupWrapper }
|
lookupLoop
lookupLoop 是在 lookup manager 启动的一个协程,用于监听 notify chan,并且将 notify chan 中的变更信息发送给每一个 lookup 用于通知 lookup 更新其拓扑信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| func (m *Manager) lookupLoop() { for { if m.isExiting.Load() { goto exit }
var unRegister bool var topicName, channelName string select { case <-m.exitChan: goto exit case v := <-m.notifyChan: switch v.(type) { case iface.ITopic: t := v.(iface.ITopic) topicName = t.GetName() if t.IsExiting() { unRegister = true } else { unRegister = false }
case iface.IChannel: c := v.(iface.IChannel) topicName = c.GetTopicName() channelName = c.GetName() if c.IsExiting() { unRegister = true } else { unRegister = false } }
for _, peer := range m.lookupPeers { m.Wrap(func() { if peer != nil { _ = peer.sendRegistration(unRegister, topicName, channelName) } }) } } }
exit: }
|
通知变更
在 Topic 和 channel 发生变更(创建,删除)时,相应的 topic 和 channel 会调用 Notify 方法,向 lmqd 通知结构的变更。主要做两件事:
- 向 lookup manager 的 notify chan 中发送一条消息,通知 lookup 更新拓扑信息。
- 对于非临时的 topic 或者 channel,进行元数据的持久化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (lmqd *LmqDaemon) Notify(v interface{}, persist bool) { isLoading := lmqd.isLoading.Load()
lmqd.waitGroup.Wrap(func() { select { case <-lmqd.exitChan: case lmqd.lookupManager.GetNotifyChan() <- v: if !persist && isLoading { return }
err := lmqd.PersistMetaData() if err != nil { logger.Errorf("lmqd PersistMetaData failed in Notify, err: %s", err.Error()) } }
}) }
|