本项目地址:LMQ
LMQ 是一个仿照 NSQ 的基于 TCP 协议的分布式消息队列,本系列说明 LMQ 的实现方式。
在本节说明 TCP 服务器的实现,TCP 服务器的实现地址:Hamble TCP Server
TCP 服务器架构
TCP 服务器的架构如下,总体上采用 1+M+N 的服务器编程模型:
- 一个协程(Server goroutine)用于建立连接(Accept)。
- M 个协程用于读写取连接请求信息。
- N 个协程用于处理请求内容,并生成响应信息。
TCP 服务器的实现
连接封装
当 Server Goroutine 接受了一个 TCP 连接后,会建立这条连接的抽象 Connection,并开启一个协程去处理连接上的所有请求。
1 2 3 4 5 6 7
| conn := newConnection(tcpConn, s) go func() { defer func() { conn.Stop() }() conn.Start() }()
|
同时,会分别开启读取和写入协程,用于读取请求和写入请求(有必要时会开启心跳检测)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func (c *Connection) Start() { logger.Infof("accept a connection from %s", c.RemoteAddr()) c.cs.CallOnConnStart(c)
c.cs.GetConnManager().Add(c)
go c.startRead() go c.startWrite() go c.startBufWrite()
if c.cs.GetHeartBeatChecker() != nil { heartbeatChecker := c.cs.GetHeartBeatChecker().Clone() heartbeatChecker.BindConn(c) c.heartbeatChecker = heartbeatChecker c.heartbeatChecker.Start() }
select { case <-c.exitChan: c.Stop() return } }
|
读写分离模型
TCP 服务器采用读写分离模型,一个专门负责从客户端读取数据,一个专门负责向客户端写数据。当建立与客户端的套接字后,那么就会开启协程分别处理读数据业务和写数据业务,读写数据之间的消息通过一个 Channel 传递。
心跳检查
服务器还支持与客户端的心跳检查,服务器可以定期的发送心跳信息来表明 TCP 服务器存活。
核心的实现就是定义一个计时器 Ticker,隔一段时间就去发送一条心跳消息给客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func (checker *Checker) start() { ticker := time.NewTicker(checker.interval) for { select { case <-ticker.C: _ = checker.check() case <-checker.closedChan: ticker.Stop() return } } }
func (checker *Checker) check() error { if !checker.connection.IsAlive() { checker.onRemoteNotAlive(checker.connection) checker.Stop() return nil }
if checker.heartbeatFunc != nil { err := checker.heartbeatFunc(checker.connection) if err != nil { return err } } else { err := checker.SendHeartBeatMsg() if err != nil { return err } }
return nil }
|
连接属性
在 Connection 中可以记录连接属性,由一个字典和保证字典互斥访问的锁实现。
1 2
| properties map[string]interface{} propertiesLock sync.Mutex
|
消息封装
对于 TCP 连接的粘包问题,采用经典的 TLV(Type-Len-Value)封包格式去解决。由于 TCP 服务器是以 TCP 流的形式发送和读取数据,所以需要有能力去区分两个消息的边界,应该提供一个统一的拆包和封包的方法。
消息被分为 Head 和 Body 两个部分,Head 中记录的数据长度和消息 ID,在收到数据的时候分两次进行读取,先读取固定长度的 Head 部分,得到后续数据的长度,再根据 DataLen 读取之后的 body。这样就能够解决粘包的问题了。
多路由消息处理
为了区分用户定义的不同 Handler,需要采用多路由的方式,而不同的路由之间以消息 ID 进行区分。
在收到一个请求消息后,根据消息 ID 得到指定的 Handler(路由信息存储在一个 map 中),随后进行处理。
1 2 3 4 5 6 7
| func (r *Router) DoHandler(request iface.IRequest) { handler := r.GetHandler(request.GetMsgID())
handler.PreHandle(request) handler.Handle(request) handler.PostHandle(request) }
|
多任务机制
可以通过 worker 的数量来限定处理业务的固定 goroutine 数量,而不是无限制的开辟 goroutine。虽然 go 的调度算法已经做的很极致了,但是大数量的 Goroutine 依然会带来一些不必要的环境切换成本,这些本应该是服务器应该节省掉的成本。
可以用消息队列来缓冲worker工作的数据。
当开启多任务机制后,通过向任务消息队列发送请求消息,实现在某个队列中进行排队。
根据消息的 ID 来指定 worker,这样可以保证在同一类型的消息,其处理顺序等于发送顺序。
1 2 3 4 5 6 7 8
| func (r *Router) SendMsgToTaskQueue(request iface.IRequest) {
workerID := int(request.GetMsgID()) % r.workerPoolSize logger.Infof("Add request msgID=%v to workerID=%v", request.GetMsgID(), workerID) r.taskQueues[workerID] <- request }
|
worker 不断的等待队列中的请求消息,当请求消息到达后,进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func (r *Router) startOneWorker(workerID int, taskQueue chan iface.IRequest) { logger.Infof("Worker ID = %v is started", workerID)
for { select { case request := <-taskQueue: r.DoHandler(request) } } }
func (r *Router) StartWorkerPool() { for i := 0; i < r.workerPoolSize; i++ { r.taskQueues[i] = make(chan iface.IRequest, conf.GlobalProfile.MaxWorkerTaskLen)
go r.startOneWorker(i, r.taskQueues[i]) } }
|