Dawn's Blogs

分享技术 记录成长

0%

LMQ实现 (5) 通信协议

本项目地址:LMQ

消息格式

LMQ 中的消息格式包括一个全局唯一的 ID(用分布式雪花 ID 生成的),数据,生产消息的时间戳,重试次数。

如时间戳、重试次数、消息 ID 等消息元数据,随着消息传输给客户端,这样做简化了客户端,客户端不需要维护消息的状态。

消息会被序列化为字节流(依次以大端存储的方式保存消息 ID、时间戳、重试次数、数据),保存在了响应的 message 字段中。

客户端与服务器之间的请求和响应统一采用 Json 进行编码。

1
2
3
4
5
6
7
8
9
10
11
type Message struct {
ID iface.MessageID `json:"ID"`
Data []byte `json:"Data"`
Timestamp int64 `json:"Timestamp"`
Attempts uint16 `json:"Attempts"`

// 优先队列中使用到的数据结构
clientID uint64
pri int64
index int
}

通信协议

LMQ 中客户端与 lmqd 的通信协议如下:

  • 首先客户端进行连接。
  • 发送 SUB 命令订阅某个 Channel。
  • 发送 RDY 命令更新可以接收消息的数量。
  • LMQ 服务器会持续推送规定数量之内的消息。
  • 最后客户端返回 FIN 和 REQ 表示消息的接收状态。

nsq protocol

客户端流量控制

通过推送数据到客户端最大限度地提高性能和吞吐量,而不是等待客户端拉数据。LMQ 的客户端通过 RDY 状态表示自己准备好接受消息的数量,这是一种客户端流量控制的方法。

当客户端连接到 lmqd 和并订阅到一个 channel 时,客户端的 RDY 初始化为 0。当客户端的 RDY 状态被更新为大于零时,channel 才开始给这个客户端发送消息。

消息确认

客户端在收到消息后,需要发送 FIN 消息,表示消息已经成功接受。客户端也可以返回 REQ 消息,表示消息接受失败,此条消息重新进行排队。

当 lmqd 在超时时间内没有接受到 FIN 或者 REQ 消息,会自动判定客户端超时,这条消息会重新排队发送。

通过消息确认机制,可以保证消息确认被客户端收到,(在 lmqd 不崩溃的情况下)消息不会丢失。