Dawn's Blogs

分享技术 记录成长

0%

godis源码阅读 (5) pipeline模式的客户端

Pipeline 模式

通常 TCP 客户端的通信模式都是阻塞式的:客户端发送请求 -> 等待服务端响应 -> 发送下一个请求。因为需要等待网络传输数据,完成一次请求循环需要等待较多时间。

针对这种效率低的情景,可以不等待服务端响应直接发送下一条请求

TCP 协议会保证数据流的有序性,同一个 TCP 连接上先发送的请求服务端先接收,先回复的响应客户端先收到。因此不必担心混淆响应所对应的请求。

这种在服务端未响应时客户端继续向服务端发送请求的模式称为 Pipeline 模式。因为减少等待网络传输的时间,Pipeline 模式可以极大的提高吞吐量。

Godis 客户端

Pipeline 模式的 Godis 客户端需要至少有两个后台协程,分别是发送请求协程(写协程)读取响应协程(读协程)。调用方通过 channel 向后台协程发送发送指令,并阻塞等待直到收到响应(或者超时)。

Client 结构

首先定义 Client 客户端,Client 客户端实现 pipeline 的核心在于两个通道

  • pendingReqs:记录等待发送的请求,客户端调用 Send 命令向客户端发送请求时,请求在这个通道内排队等待写协程发送请求
  • waitingReqs:记录等待服务器响应的请求,向服务器发送请求成功后将这个请求加入到这个通道中等待响应。当读协程收到一个服务器响应时就从通道中取出一个请求,此时一个完整的请求+响应完成。
1
2
3
4
5
6
7
8
9
10
type Client struct {
conn net.Conn // 与服务端的 tcp 连接
pendingReqs chan *Request // 等待发送的请求
waitingReqs chan *Request // 等待服务器响应的请求
ticker *time.Ticker // 用于触发心跳包的计时器
addr string

status int32 // 服务器状态(创建/运行/关闭)
working *sync.WaitGroup // 有请求正在处理不能立即停止,用于实现 graceful shutdown
}

接着定义客户端的请求 Request:

1
2
3
4
5
6
7
8
type Request struct {
id uint64 // 请求id
args [][]byte // 上行参数
reply redis.Reply // 收到的返回值
heartbeat bool // 标记是否是心跳请求
waiting *wait.Wait // 调用协程发送请求后通过 waitgroup 等待请求异步处理完成
err error
}

wait.Wait

客户端请求中 waiting 属性的类型为 wait.Wait,实际上就是 sync.WaitGroup 加上了超时功能。

1
2
3
4
// Wait is similar with sync.WaitGroup which can wait with timeout
type Wait struct {
wg sync.WaitGroup
}

wait.Wait 和 sync.WaitGroup 一样也有 Add、Done、Wait 方法。不同的是,它额外有一个 WaitWithTimeout 方法,这个方法阻塞 WaitGroup 直到计数器为零或者超时。其实现的方法是:

  • 定义一个管道,用于接收完成信号
  • 开启一个协程,调用 Wait 阻塞协程, WaitGroup 计数器为零。当计数器为零时向管道中发送信号,表明完成任务。
  • 使用 Select 阻塞,当超时或者完成任务(在管道中检测到信号)时,方法结束阻塞返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// WaitWithTimeout blocks until the WaitGroup counter is zero or timeout
// returns true if timeout
func (w *Wait) WaitWithTimeout(timeout time.Duration) bool {
c := make(chan struct{}, 1)
go func() {
defer close(c)
w.Wait()
c <- struct{}{}
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}

Send 向服务器发送请求

Client.Send 方法用于调用者将请求发送给后台协程,并通过 wait group 等待异步处理完成

1
2
// Send sends a request to redis server
func (client *Client) Send(args [][]byte) redis.Reply

流程如下:

  • 首先定义一个请求,接着调用 request.waiting.Add 用于等待请求处理完成(或者超时)调用 client.working.Add 用于表明客户端又增加了一个正在处理的请求
1
2
3
4
5
6
7
8
9
10
11
if atomic.LoadInt32(&client.status) != running {
return protocol.MakeErrReply("client closed")
}
request := &request{
args: args,
heartbeat: false,
waiting: &wait.Wait{},
}
request.waiting.Add(1)
client.working.Add(1)
defer client.working.Done()
  • 请求加入到 client.pendingReqs 通道中进行排队,等待发送请求的协程从通道中依次取出请求、发送请求
1
client.pendingReqs <- request
  • 通过 request.waiting.WaitWithTimeout 方法阻塞等待请求完成(或者超时)
1
2
3
4
5
6
7
8
timeout := request.waiting.WaitWithTimeout(maxWait)
if timeout {
return protocol.MakeErrReply("server time out")
}
if request.err != nil {
return protocol.MakeErrReply("request failed")
}
return request.reply

handleWrite 写协程

写协程开启一个循环,不断读取 Client.pendingReqs 需要发送的请求,调用 doRequest 进行发送。

1
2
3
4
5
func (client *Client) handleWrite() {
for req := range client.pendingReqs {
client.doRequest(req)
}
}

doRequest

Client.doRequest 用于向服务器端发送一次请求。

1
func (client *Client) doRequest(req *request)

流程如下:

  • 首先序列化请求。
1
2
re := protocol.MakeMultiBulkReply(req.args)
bytes := re.ToBytes()
  • 接着失败重试,最多重试 3 次。
1
2
3
4
5
6
7
8
9
var err error
for i := 0; i < 3; i++ { // only retry, waiting for handleRead
_, err = client.conn.Write(bytes)
if err == nil ||
(!strings.Contains(err.Error(), "timeout") && // only retry timeout
!strings.Contains(err.Error(), "deadline exceeded")) {
break
}
}
  • 若发送成功,将请求推入 Client.waitingReqs 通道,等待服务器响应。若超过 3次失败,则结束发送。
1
2
3
4
5
6
if err == nil {
client.waitingReqs <- req
} else {
req.err = err
req.waiting.Done()
}

handleRead 读协程

读协程调用 parser.ParseStream 开启协议解析,持续从 Payload 通道中读取响应(协议解析器每收到一个请求并解析后就向 Payload 通道中发送一个数据)。每次从通道中获取 Payload 响应后,调用 Client.finishRequest 方法结束相应的请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (client *Client) handleRead() {
ch := parser.ParseStream(client.conn)
for payload := range ch {
if payload.Err != nil {
status := atomic.LoadInt32(&client.status)
if status == closed {
return
}
client.reconnect()
return
}
client.finishRequest(payload.Data)
}
}

finishRequest

Client.finifhRequest 方法用于结束相应的请求,传入的 Reply 代表已经收到了一个服务器响应。

  • 首先需要从 Client.waitingReqs 通道中取出一个请求(因为 TCP 的有序性,这个请求刚好对应于这个响应)。
  • 接着调用 request.waiting.Done 表示请求完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (client *Client) finishRequest(reply redis.Reply) {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
logger.Error(err)
}
}()
request := <-client.waitingReqs
if request == nil {
return
}
request.reply = reply
if request.waiting != nil {
request.waiting.Done()
}
}

heartbeat 心跳协程

心跳协程每隔一段时间就调用 Client.doHeartbeat 向服务器发送一个心跳,维持 TCP 连接

1
2
3
4
5
func (client *Client) heartbeat() {
for range client.ticker.C {
client.doHeartbeat()
}
}

doHeartbeat

Client.doHeartbeat 用于给服务器发送心跳,就是把心跳请求送入 Client.pendingReqs 通道中等待写协程发送请求

1
2
3
4
5
6
7
8
9
10
11
12
func (client *Client) doHeartbeat() {
request := &request{
args: [][]byte{[]byte("PING")},
heartbeat: true,
waiting: &wait.Wait{},
}
request.waiting.Add(1)
client.working.Add(1)
defer client.working.Done()
client.pendingReqs <- request
request.waiting.WaitWithTimeout(maxWait)
}

启动/关闭客户端

Start 启动客户端

Client.Start 用于开启客户端,主要工作:

  • 开启一个 10 秒钟的计时器,用于发送心跳。
  • 开启三个异步协程,分别是写协程、读协程、心跳协程。
  • 将服务器状态变更为 running。
1
2
3
4
5
6
7
8
// Start starts asynchronous goroutines
func (client *Client) Start() {
client.ticker = time.NewTicker(10 * time.Second)
go client.handleWrite()
go client.handleRead()
go client.heartbeat()
atomic.StoreInt32(&client.status, running)
}

Close 关闭客户端

关闭客户端时:

  • 将客户端的状态变更为 closed,停止用于发送心跳的计时器。
  • 关闭 Client.pendingReqs,阻止新的请求进入队列
  • 等待处理中的请求处理完成
  • 释放资源,包括关闭与服务器的连接(连接关闭后读协程会退出)、关闭等待响应的队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Close stops asynchronous goroutines and close connection
func (client *Client) Close() {
atomic.StoreInt32(&client.status, closed)
client.ticker.Stop()
// stop new request
close(client.pendingReqs)

// wait stop process
client.working.Wait()

// clean
_ = client.conn.Close()
close(client.waitingReqs)
}