Dawn's Blogs

分享技术 记录成长

0%

LMQ实现 (3) Topic和Channel

本项目地址:LMQ

lmqd 中的元数据是指 lmqd 中的 Topic 和 Channel 信息

Topic

Topic 负责接受生产者的消息,并且将消息复制分发给每一个 channel 中。

数据结构

Topic 的结构如下,主要的属性有:

  • name:topic 的名称。
  • isTemporary:是否是临时的 Topic,如果名称带有 #tmp 则表明这是一个临时的 Topic。
    • 对于临时 Topic,backendQueue(磁盘队列)为 nil,所以超出内存队列长度的数据会被直接丢弃。
    • 若 channel 为空,则会立即删除这个 topic。
    • 一个临时的 topic 不会持久化到磁盘中。
  • channels:topic 下有多个 channel,保存所有在此 topic 下的 channel。
  • memoryMsgChan:内存消息队列。
  • backendMsgChan:磁盘消息队列。
  • deleteCallback:这个 topic 删除后的回调函数,主要用于删除 lmqd 中维护的 topic 信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type Topic struct {
lmqd iface.ILmqDaemon

name string
isTemporary bool // 标记是否是临时的topic
isPausing atomic.Bool // 标记是否已经暂停
isExiting atomic.Bool // 标记是否已经退出
channels map[string]iface.IChannel // 保存所有的channel字典
channelsLock sync.RWMutex // 控制对channel字典的互斥访问

guidFactory *GUIDFactory // message id 生成器

memoryMsgChan chan iface.IMessage // 内存chan
backendQueue backendqueue.BackendQueue // 当内存chan满了之后,将消息存入到后端队列中(持久化保存)

deleteCallback func(topic iface.ITopic)
deleter sync.Once

startChan chan struct{}
updateChan chan struct{}
pauseChan chan struct{}
closingChan chan struct{}
closedChan chan struct{}

messageCount atomic.Uint64
messageBytes atomic.Uint64
}

生产消息

通过 select,在内存队列中放入消息。如果内存队列已满,则会进入 default 分支,将消息送入磁盘消息队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (topic *Topic) PutMessage(msg iface.IMessage) error {
topic.channelsLock.RLock()
defer topic.channelsLock.RUnlock()
if topic.isExiting.Load() {
return e.ErrTopicIsExiting
}

if msg.GetDataLength() < config.GlobalLmqdConfig.MinMessageSize || msg.GetDataLength() > config.GlobalLmqdConfig.MaxMessageSize {
// 消息长度不合法
return e.ErrMessageLengthInvalid
}

select {
case topic.memoryMsgChan <- msg:
default:
// 存入backend queue

// 转为[]byte
data, err := message.ConvertMessageToBytes(msg)
if err != nil {
logger.Errorf("topic(%s) convert message to bytes err when PutMessage: %s", topic.name, err.Error())
return err
}

// 放到disk queue中
err = topic.backendQueue.Put(data)
if err != nil {
logger.Errorf("topic(%s) convert message to bytes err when put msg into backend queue: %s", topic.name, err.Error())
return err
}
}

topic.messageCount.Add(1)
topic.messageBytes.Add(uint64(len(msg.GetData())))

return nil
}

消息分发

Topic 在开启时会启动一个线程专门用于监听消息分发和状态更新。Topic 的消息的分发由 messagePump 方法实现,负责复制消息到每一个 Channel 上。

messagePump 方法监听着以下几个 chan:

  • memoryMsgChan:内存消息队列。
  • backendMsgChan:磁盘消息队列,从磁盘中获取消息。
  • topic.updateChan:Channel 更新队列,用于更新分发数据的 Channel。
  • topic.pauseChan:提醒暂停或者恢复 Topic 的队列。
  • topic.closingChan:监听推出信号。

当从内存消息队列或者磁盘消息队列中读取到消息,则复制分发给所有的 Channel 上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
func (topic *Topic) messagePump() {
var memoryMsgChan chan iface.IMessage
var backendMsgChan <-chan []byte
var msg iface.IMessage
var channels []iface.IChannel

for {
select {
case <-topic.pauseChan:
continue
case <-topic.updateChan:
continue
case <-topic.closingChan:
goto Exit
case <-topic.startChan:
}
break
}

// 改为运行中状态
logger.Infof("topic [%] is running", topic.name)

topic.channelsLock.RLock()
for _, channel := range topic.channels {
channels = append(channels, channel)
}
topic.channelsLock.RUnlock()
if len(channels) > 0 && !topic.isPausing.Load() {
memoryMsgChan = topic.memoryMsgChan
backendMsgChan = topic.backendQueue.ReadChan()
}

for {
select {
case msg = <-memoryMsgChan: // 获取msg
case data := <-backendMsgChan: // 从disk queue中获取
var err error
msg, err = message.ConvertBytesToMessage(data)
if err != nil {
logger.Errorf("topic(%s) convert bytes to message failed when message pump, err:%s", topic.name, err.Error())
continue
}
case <-topic.updateChan: // 更新channels
channels = channels[:0]
topic.channelsLock.RLock()
for _, channel := range topic.channels {
channels = append(channels, channel)
}
topic.channelsLock.RUnlock()
if len(channels) == 0 || topic.isPausing.Load() {
memoryMsgChan = nil
} else {
memoryMsgChan = topic.memoryMsgChan
}

continue
case <-topic.closingChan: // 退出
goto Exit
case <-topic.pauseChan: // 暂停/恢复
if !topic.IsPausing() {
memoryMsgChan = topic.memoryMsgChan
backendMsgChan = topic.backendQueue.ReadChan()
} else {
memoryMsgChan = nil
backendMsgChan = nil
}

continue
}

// 向所有channel发送消息
logger.Infof("topic(%s) is publishing a message", topic.name)
for i, channel := range channels {
var chanMsg iface.IMessage

if i > 0 {
chanMsg = message.NewMessage(msg.GetID(), msg.GetData())
} else {
chanMsg = msg
}
_ = channel.PutMessage(chanMsg)
}
}

Exit:
topic.closedChan <- struct{}{}
logger.Infof("topic [%s] is exited", topic.name)
}

Channel

Channel 在收到 Topic 分发的消息后,会随机选择一个客户端进行发送,以此实现负载均衡。同样的,Channel 也维护着一个内存消息队列和磁盘消息队列。

此外,Channel 还维护着一个优先队列,用于处理客户端超时的消息。

数据结构

Channel 的结构如下,以下属性需要额外说明:

  • clients:维护着所有订阅此 channel 的客户端。
  • inFlightMessagesPriQueue:维护着一个优先队列,按照客户端超时时间进行排序,定期检查优先队列,检查这条消息是否客户端超时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type Channel struct {
sync.RWMutex

lmqd iface.ILmqDaemon
topicName string // topic名称
name string // channel名称
isTemporary bool // 标记是否是临时的topic
isExiting atomic.Bool // 是否退出
exitLock sync.RWMutex // 发送消息与退出的互斥
isPausing atomic.Bool // 是否已经暂停

memoryMsgChan chan iface.IMessage // 内存chan
backendQueue backendqueue.BackendQueue // backend队列

deleteCallback func(topic iface.IChannel)
deleter sync.Once

clients map[uint64]iface.IConsumer

inFlightMessages map[iface.MessageID]iface.IMessage // 在给客户端发送过程中的message
inFlightMessagesPriQueue *inFlightPriQueue // 在给客户端发送过程中的message,优先队列
inFlightMessagesLock sync.Mutex

messageCount atomic.Uint64 // 消息数量
requeueCount atomic.Uint64 // 重新入队的消息数量
timeoutCount atomic.Uint64 // 超时消息的数量
}

处理超时消息

因为 Channel 具备处理客户端超时消息的功能,所以在创建 Channel 时,会启动一个协程去检查客户端超时消息。

Channel 维护着一个优先队列,按照客户端超时时间进行排序,定期检查优先队列,检查这条消息是否客户端超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 扫描队列,处理超时消息
func (channel *Channel) queueScanWorker() {
ticker := time.NewTicker(config.GlobalLmqdConfig.ScanQueueInterval)
for {
select {
case <-ticker.C:
// 处理 in-flight 的超时消息
go channel.processInFlightQueue()
}

if channel.isExiting.Load() {
ticker.Stop()
return
}
}
}

processInFlightQueue

在 processInFlightQueue 方法中,用于检查客户端超时消息。

因为优先队列是按照超时时间维护的一个小根堆,所以堆顶元素存储的是可能最先超时的消息

每一次取小于当前时间的堆顶元素,如果有则说明该条消息超时,进行客户端消息超时处理(客户端维护的 in-flight 消息数量-1)。同时,对于客户端超时的消息看作是发送失败的消息,重新进行排队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (channel *Channel) processInFlightQueue() {
channel.exitLock.RLock()
channel.exitLock.RUnlock()

if channel.isExiting.Load() {
return
}

now := time.Now().UnixNano()
for {
channel.inFlightMessagesLock.Lock()
msg := channel.inFlightMessagesPriQueue.PeekAndShift(now)
channel.inFlightMessagesLock.Unlock()

if msg == nil {
return
}

_, err := channel.popInFlightMessage(msg.GetClientID(), msg.GetID())
if err != nil {
return
}

channel.timeoutCount.Add(1)

channel.RLock()
client, ok := channel.clients[msg.GetClientID()]
if ok {
client.TimeoutMessage()
}
channel.RUnlock()

logger.Infof("message id = %v timeout, now requeue", msg.GetID())
_ = channel.put(msg)
}
}