Dawn's Blogs

分享技术 记录成长

0%

LMQ实现 (6) LmqLookup

本项目地址:LMQ

Lookup

LMQ Lookup 的功能就是存储 LMQ 分布式拓扑结构,为客户端提供查询服务。

拓扑结构

Lookup 维护了 lmqd,topic 和 channel 之间的拓扑结构,三者的对应关系如下。其中,lmqd 和 topic 为多对多的关系,topic 和 channel 为一对多的关系。

1
lmqd(producer) <--- M:N ---> topic <--- 1:N ---> channel

存储结构

根据 lookup 维护的拓扑结构,look 主要维护了这样一个注册字典:

  • 字典的键值分为两种类型,channel 和 topic。
  • 字典的值是对应的所有的生产者(Lmqd)。
1
2
3
4
type RegistrationDB struct {
sync.RWMutex
registrationMap map[iface.IRegistration]iface.ProducerMap
}

所以,存储结构如下:

1
2
3
4
5
6
7
{topic, topicName1, ""} ---> [{id1: producer1}, {id2: producer2}, ...]
{topic, topicName2, ""} ---> [{id3: producer3}, {id4: producer4}, ...]
...

{channel, topicName1, channelName1} ---> [{id1: producer1}, {id2: producer2}, ...]
{channel, topicName2, channelName2} ---> [{id3: producer3}, {id4: producer4}, ...]
...

LookupManager

在 Lmqd 中,有一个组件为 LookupManager,有两个功能:

  1. 维护 lmqd 与 lookup manger 的长连接;
  2. 同步信息,通知 lookup 更新其拓扑。

数据结构

lookup manager 的数据结构如下,其中:

  • lookupPeers:维护了与每一个 lookup 的长连接。
  • norifyChan:lmqd 将 topic 和 channel 的变更消息都会发送到这个 chan 中,manager 接受变更信息并通知 lookup。
1
2
3
4
5
6
7
8
9
type Manager struct {
lmqd iface.ILmqDaemon
lookupPeers []*lookupPeer
notifyChan chan interface{} // 用于通知lookup本节点的topic或者channel更新了
isExiting atomic.Bool
exitChan chan struct{}

utils.WaitGroupWrapper
}

lookupLoop

lookupLoop 是在 lookup manager 启动的一个协程,用于监听 notify chan,并且将 notify chan 中的变更信息发送给每一个 lookup 用于通知 lookup 更新其拓扑信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (m *Manager) lookupLoop() {
for {
if m.isExiting.Load() {
goto exit
}

var unRegister bool
var topicName, channelName string
select {
case <-m.exitChan:
goto exit
case v := <-m.notifyChan:
switch v.(type) { // 检查v的类型
case iface.ITopic: // 如果是topic
t := v.(iface.ITopic)
topicName = t.GetName()
if t.IsExiting() {
// 已经退出了,unregister
unRegister = true
} else {
// 没有退出,register
unRegister = false
}

case iface.IChannel: // 如果是channel
c := v.(iface.IChannel)
topicName = c.GetTopicName()
channelName = c.GetName()
if c.IsExiting() {
// 已经退出了,unregister
unRegister = true
} else {
// 没有退出,register
unRegister = false
}
}

for _, peer := range m.lookupPeers {
m.Wrap(func() {
if peer != nil {
_ = peer.sendRegistration(unRegister, topicName, channelName)
}
})
}
}
}

exit:
}

通知变更

在 Topic 和 channel 发生变更(创建,删除)时,相应的 topic 和 channel 会调用 Notify 方法,向 lmqd 通知结构的变更。主要做两件事:

  1. 向 lookup manager 的 notify chan 中发送一条消息,通知 lookup 更新拓扑信息。
  2. 对于非临时的 topic 或者 channel,进行元数据的持久化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Notify 通知lmqd进行持久化,通知lookup
func (lmqd *LmqDaemon) Notify(v interface{}, persist bool) {
isLoading := lmqd.isLoading.Load()

lmqd.waitGroup.Wrap(func() {
select {
case <-lmqd.exitChan:
case lmqd.lookupManager.GetNotifyChan() <- v:
if !persist && isLoading {
return
}

err := lmqd.PersistMetaData()
if err != nil {
logger.Errorf("lmqd PersistMetaData failed in Notify, err: %s", err.Error())
}
}

})
}