Pipeline 模式
通常 TCP 客户端的通信模式都是阻塞式的:客户端发送请求 -> 等待服务端响应 -> 发送下一个请求。因为需要等待网络传输数据,完成一次请求循环需要等待较多时间。
针对这种效率低的情景,可以不等待服务端响应直接发送下一条请求。
TCP 协议会保证数据流的有序性,同一个 TCP 连接上先发送的请求服务端先接收,先回复的响应客户端先收到。因此不必担心混淆响应所对应的请求。
这种在服务端未响应时客户端继续向服务端发送请求的模式称为 Pipeline 模式。因为减少等待网络传输的时间,Pipeline 模式可以极大的提高吞吐量。
Godis 客户端
Pipeline 模式的 Godis 客户端需要至少有两个后台协程,分别是发送请求协程(写协程)和读取响应协程(读协程)。调用方通过 channel 向后台协程发送发送指令,并阻塞等待直到收到响应(或者超时)。
Client 结构
首先定义 Client 客户端,Client 客户端实现 pipeline 的核心在于两个通道:
- pendingReqs:记录等待发送的请求,客户端调用 Send 命令向客户端发送请求时,请求在这个通道内排队等待写协程发送请求。
- waitingReqs:记录等待服务器响应的请求,向服务器发送请求成功后将这个请求加入到这个通道中等待响应。当读协程收到一个服务器响应时就从通道中取出一个请求,此时一个完整的请求+响应完成。
1 | type Client struct { |
接着定义客户端的请求 Request:
1 | type Request struct { |
wait.Wait
客户端请求中 waiting 属性的类型为 wait.Wait,实际上就是 sync.WaitGroup 加上了超时功能。
1 | // Wait is similar with sync.WaitGroup which can wait with timeout |
wait.Wait 和 sync.WaitGroup 一样也有 Add、Done、Wait 方法。不同的是,它额外有一个 WaitWithTimeout 方法,这个方法阻塞 WaitGroup 直到计数器为零或者超时。其实现的方法是:
- 定义一个管道,用于接收完成信号。
- 开启一个协程,调用 Wait 阻塞协程, WaitGroup 计数器为零。当计数器为零时向管道中发送信号,表明完成任务。
- 使用 Select 阻塞,当超时或者完成任务(在管道中检测到信号)时,方法结束阻塞返回。
1 | // WaitWithTimeout blocks until the WaitGroup counter is zero or timeout |
Send 向服务器发送请求
Client.Send 方法用于调用者将请求发送给后台协程,并通过 wait group 等待异步处理完成。
1 | // Send sends a request to redis server |
流程如下:
- 首先定义一个请求,接着调用 request.waiting.Add 用于等待请求处理完成(或者超时),调用 client.working.Add 用于表明客户端又增加了一个正在处理的请求。
1 | if atomic.LoadInt32(&client.status) != running { |
- 将请求加入到 client.pendingReqs 通道中进行排队,等待发送请求的协程从通道中依次取出请求、发送请求。
1 | client.pendingReqs <- request |
- 通过 request.waiting.WaitWithTimeout 方法阻塞等待请求完成(或者超时)。
1 | timeout := request.waiting.WaitWithTimeout(maxWait) |
handleWrite 写协程
写协程开启一个循环,不断读取 Client.pendingReqs 需要发送的请求,调用 doRequest 进行发送。
1 | func (client *Client) handleWrite() { |
doRequest
Client.doRequest 用于向服务器端发送一次请求。
1 | func (client *Client) doRequest(req *request) |
流程如下:
- 首先序列化请求。
1 | re := protocol.MakeMultiBulkReply(req.args) |
- 接着失败重试,最多重试 3 次。
1 | var err error |
- 若发送成功,将请求推入 Client.waitingReqs 通道,等待服务器响应。若超过 3次失败,则结束发送。
1 | if err == nil { |
handleRead 读协程
读协程调用 parser.ParseStream 开启协议解析,持续从 Payload 通道中读取响应(协议解析器每收到一个请求并解析后就向 Payload 通道中发送一个数据)。每次从通道中获取 Payload 响应后,调用 Client.finishRequest 方法结束相应的请求。
1 | func (client *Client) handleRead() { |
finishRequest
Client.finifhRequest 方法用于结束相应的请求,传入的 Reply 代表已经收到了一个服务器响应。
- 首先需要从 Client.waitingReqs 通道中取出一个请求(因为 TCP 的有序性,这个请求刚好对应于这个响应)。
- 接着调用 request.waiting.Done 表示请求完成。
1 | func (client *Client) finishRequest(reply redis.Reply) { |
heartbeat 心跳协程
心跳协程每隔一段时间就调用 Client.doHeartbeat 向服务器发送一个心跳,维持 TCP 连接。
1 | func (client *Client) heartbeat() { |
doHeartbeat
Client.doHeartbeat 用于给服务器发送心跳,就是把心跳请求送入 Client.pendingReqs 通道中等待写协程发送请求。
1 | func (client *Client) doHeartbeat() { |
启动/关闭客户端
Start 启动客户端
Client.Start 用于开启客户端,主要工作:
- 开启一个 10 秒钟的计时器,用于发送心跳。
- 开启三个异步协程,分别是写协程、读协程、心跳协程。
- 将服务器状态变更为 running。
1 | // Start starts asynchronous goroutines |
Close 关闭客户端
关闭客户端时:
- 将客户端的状态变更为 closed,停止用于发送心跳的计时器。
- 关闭 Client.pendingReqs,阻止新的请求进入队列。
- 等待处理中的请求处理完成。
- 释放资源,包括关闭与服务器的连接(连接关闭后读协程会退出)、关闭等待响应的队列。
1 | // Close stops asynchronous goroutines and close connection |