Dawn's Blogs

分享技术 记录成长

0%

LMQ实现 (1) TCP服务器

本项目地址:LMQ

LMQ 是一个仿照 NSQ 的基于 TCP 协议的分布式消息队列,本系列说明 LMQ 的实现方式。

在本节说明 TCP 服务器的实现,TCP 服务器的实现地址:Hamble TCP Server

TCP 服务器架构

TCP 服务器的架构如下,总体上采用 1+M+N 的服务器编程模型

  • 一个协程(Server goroutine)用于建立连接(Accept)。
  • M 个协程用于读写取连接请求信息。
  • N 个协程用于处理请求内容,并生成响应信息。

image-20231218132654361

TCP 服务器的实现

连接封装

当 Server Goroutine 接受了一个 TCP 连接后,会建立这条连接的抽象 Connection,并开启一个协程去处理连接上的所有请求。

1
2
3
4
5
6
7
conn := newConnection(tcpConn, s)
go func() {
defer func() {
conn.Stop()
}()
conn.Start() // 连接开始工作
}()

同时,会分别开启读取和写入协程,用于读取请求和写入请求(有必要时会开启心跳检测)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (c *Connection) Start() {
logger.Infof("accept a connection from %s", c.RemoteAddr())
// 执行Hook函数
c.cs.CallOnConnStart(c)

c.cs.GetConnManager().Add(c)

go c.startRead()
go c.startWrite()
go c.startBufWrite()

if c.cs.GetHeartBeatChecker() != nil {
// 开启心跳检测
heartbeatChecker := c.cs.GetHeartBeatChecker().Clone()
heartbeatChecker.BindConn(c)
c.heartbeatChecker = heartbeatChecker
c.heartbeatChecker.Start()
}

// 阻塞,直到退出
select {
case <-c.exitChan:
c.Stop()
return
}
}

读写分离模型

TCP 服务器采用读写分离模型,一个专门负责从客户端读取数据,一个专门负责向客户端数据。当建立与客户端的套接字后,那么就会开启协程分别处理读数据业务和写数据业务,读写数据之间的消息通过一个 Channel 传递

心跳检查

服务器还支持与客户端的心跳检查,服务器可以定期的发送心跳信息来表明 TCP 服务器存活。

核心的实现就是定义一个计时器 Ticker,隔一段时间就去发送一条心跳消息给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (checker *Checker) start() {
ticker := time.NewTicker(checker.interval)
for {
select {
case <-ticker.C:
_ = checker.check()
case <-checker.closedChan:
ticker.Stop()
return
}
}
}

func (checker *Checker) check() error {
// 首先检查连接是否存活
if !checker.connection.IsAlive() {
// 如果不存活
checker.onRemoteNotAlive(checker.connection)
checker.Stop()
return nil
}

// 存活则发送心跳消息
if checker.heartbeatFunc != nil {
err := checker.heartbeatFunc(checker.connection)
if err != nil {
return err
}
} else {
err := checker.SendHeartBeatMsg()
if err != nil {
return err
}
}

return nil
}

连接属性

在 Connection 中可以记录连接属性,由一个字典和保证字典互斥访问的锁实现。

1
2
properties     map[string]interface{} //    记录连接属性
propertiesLock sync.Mutex // 保证连接属性的互斥访问

消息封装

对于 TCP 连接的粘包问题,采用经典的 TLV(Type-Len-Value)封包格式去解决。由于 TCP 服务器是以 TCP 流的形式发送和读取数据,所以需要有能力去区分两个消息的边界,应该提供一个统一的拆包和封包的方法。

消息被分为 Head 和 Body 两个部分,Head 中记录的数据长度和消息 ID,在收到数据的时候分两次进行读取,先读取固定长度的 Head 部分,得到后续数据的长度,再根据 DataLen 读取之后的 body。这样就能够解决粘包的问题了。

image-20231218134311032

多路由消息处理

为了区分用户定义的不同 Handler,需要采用多路由的方式,而不同的路由之间以消息 ID 进行区分

在收到一个请求消息后,根据消息 ID 得到指定的 Handler(路由信息存储在一个 map 中),随后进行处理。

1
2
3
4
5
6
7
func (r *Router) DoHandler(request iface.IRequest) {
handler := r.GetHandler(request.GetMsgID()) // 根据MsgID获取handler

handler.PreHandle(request)
handler.Handle(request)
handler.PostHandle(request)
}

多任务机制

可以通过 worker 的数量来限定处理业务的固定 goroutine 数量,而不是无限制的开辟 goroutine。虽然 go 的调度算法已经做的很极致了,但是大数量的 Goroutine 依然会带来一些不必要的环境切换成本,这些本应该是服务器应该节省掉的成本。

可以用消息队列来缓冲worker工作的数据。

img

当开启多任务机制后,通过向任务消息队列发送请求消息,实现在某个队列中进行排队。

根据消息的 ID 来指定 worker,这样可以保证在同一类型的消息,其处理顺序等于发送顺序。

1
2
3
4
5
6
7
8
func (r *Router) SendMsgToTaskQueue(request iface.IRequest) {
//根据ConnID来分配当前的连接应该由哪个worker负责处理

workerID := int(request.GetMsgID()) % r.workerPoolSize
logger.Infof("Add request msgID=%v to workerID=%v", request.GetMsgID(), workerID)
//将请求消息发送给任务队列
r.taskQueues[workerID] <- request
}

worker 不断的等待队列中的请求消息,当请求消息到达后,进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (r *Router) startOneWorker(workerID int, taskQueue chan iface.IRequest) {
logger.Infof("Worker ID = %v is started", workerID)

//不断等待队列中的消息
for {
select {
//有消息则取出队列的Request,并执行绑定的业务方法
case request := <-taskQueue:
r.DoHandler(request)
}
}
}

func (r *Router) StartWorkerPool() {
for i := 0; i < r.workerPoolSize; i++ {
//给当前worker对应的任务队列开辟空间
r.taskQueues[i] = make(chan iface.IRequest, conf.GlobalProfile.MaxWorkerTaskLen)

// 开启worker
go r.startOneWorker(i, r.taskQueues[i])
}
}