本项目完整地址 simple-redis
客户端连接
在上一篇文章中,Handler 在 Handle 方法中,对于一个新的连接 conn 要初始化一个客户端与服务器连接的抽象 connect.Connection。这个结构体定义在 redis/connection/conn.go 中。
Connection 结构体
Connection 的结构体定义如下,有一些字段需要说明:
- sendingData:用于优雅的结束连接。当结束连接时会等待,直到数据发送完成或者超时。
- password:当配置了 password 后,当发送 auth 命令时就会修改这个字段,数据库服务器就会检查这个字段是否是正确的。
- selectedDB:当客户端发送 select 命令时,就会修改此字段的值,用于控制当前客户端处于哪个数据库中。
- isMulti,queue,syntaxErrQueue,watching,TxID:与事务相关的字段。
- subscribeChannels:记录客户端订阅的频道。
1 | // Connection represents a connection with a client |
sync.Pool 对象复用
在创建一个新的客户端连接抽象时,会复用之前已经结束的客户端连接。之前废弃的连接保存在 sync.Pool 对象资源池 connPool 中(这是一个全局变量):
1 | var connPool = sync.Pool{ |
Write 写入数据
Write 方法用于写入数据,当写入时会 调用 c.sendingData.Add(1)
,结束时会调用 c.sendingData.Done()
。
1 | // Write sends response to client over tcp connection |
wait.Wait
sendingData 字段的类型为 wait.Wait,定义在 lib/sync/wait/wait.go 中。
wait.Wait 与 sync.GroupWait 的功能类似,只是在 sync.GroupWait 的基础上加入了超时的功能。不管是内部 wg 所有等待的事件结束(计数器归零)还是超时,都会停止等待。
1 | // Wait a sync.WaitGroup with timeout |
Close 关闭连接
因为关闭连接时,需要等待将当前需要发送的数据发送完成后才可以关闭(优雅的关闭)。关闭流程如下:
- 先调用
c.sendingData.WaitWithTimeout(10 * time.Second)
等待发送完成(或者超时)。 - 重置内部的相关字段,然后调用
connPool.Put(c)
将对象放回对象池中,以便进行对象的复用。
事务相关
Connection 结构体事务相关的方法都定义在 redis/connection/transaction.go 中。
isMulti,queue,syntaxErrQueue,watching,TxID 字段与事务相关。下面依次进行解释:
- isMulti:当客户端执行 multi 命令时,isMulti 会被置为 true,当执行 discard 或者 exec 命令时 isMulti 会被置为 false。
- queue:当 isMulti 为 true 时,客户端发送的所有命令都会被底层数据库服务器记录在 queue 命令队列中。
- syntaxErrQueue:底层服务器会在 multi 时,对命令进行语法错误检查,如果发现了语法错误,则记录在这个队列中。当这个队列不为空时,说明在入队时发现语法错误,整个队列中的命令都会放弃执行。
- watching:调用 watch 命令时,会将相关的 key 记录在这个 map 中。simple-redis 采用乐观锁的方式,其中键为被监视的 key,值为执行 wath 命令时 key 的版本号。
- TxID:在集群模式下,会开启分布式事务。在分布式事务中,分布式事务协调者会利用 snowflake 算法生成一个事务 ID,用于唯一的标识这个分布式事务。
订阅发布相关
Connection 结构体订阅发布相关的方法都定义在 redis/connection/publish.go 中。
subscribeChannels 代表客户端订阅的频道,它实际上是一个集合,它记录了这个客户端所有的订阅频道名字。
redis.Connection 接口
connect.Connect 结构体实现了 redis.Connection 接口,这个接口定义在 interface/redis/conn.go 文件中。定义 redis.Connection 接口的原因在于:
在 AOF 持久化中,为了重写 AOF 持久化文件,在临时数据库中重放文件中的命令时,因为通用性(在数据库中执行时需要传入一个 redis.Connection 接口类型的值),需要用到一个虚假连接(conection.FakeConn,定义在 redis/conection/fake_conn.go 中)。
pipeline 客户端
在集群模式或者命令行模式下需要用到 simple-redis 客户端,与 simple-redis 服务器进行通信。
pipeline 模式
通常 TCP 客户端的通信模式都是阻塞式的:客户端发送请求 -> 等待服务端响应 -> 发送下一个请求。因为需要等待网络传输数据,完成一次请求循环需要等待较多时间。
针对这种效率低的情景,可以不等待服务端响应直接发送下一条请求。
TCP 协议会保证数据流的有序性,同一个 TCP 连接上先发送的请求服务端先接收,先回复的响应客户端先收到。因此不必担心混淆响应所对应的请求。
这种在服务端未响应时客户端继续向服务端发送请求的模式称为 Pipeline 模式。因为减少等待网络传输的时间,Pipeline 模式可以极大的提高吞吐量。
Pipeline 模式的 Godis 客户端需要至少有两个后台协程,分别是发送请求协程(写协程)和读取响应协程(读协程)。调用方通过 channel 向后台协程发送发送指令,并阻塞等待直到收到响应(或者超时)。
数据结构
Client
首先定义 Client 客户端,Client 客户端实现 pipeline 的核心在于两个通道:
- pendingReqs:记录等待发送的请求,客户端调用 Send 命令向客户端发送请求时,请求在这个通道内排队等待写协程发送请求。
- waitingReqs:记录等待服务器响应的请求,向服务器发送请求成功后将这个请求加入到这个通道中等待响应。当读协程收到一个服务器响应时就从通道中取出一个请求,此时一个完整的请求+响应完成。
1 | type Client struct { |
request
客户端请求 request 的结构如下,它代表一个从请求到响应的完整的客户端请求。
- args:客户端向服务器发送的命令行参数。
- reply:收到的返回值。
1 | type request struct { |
启动/关闭客户端
Start 启动客户端
Client.Start 用于开启客户端,主要工作:
- 开启两个协程,分别是写协程(用于发送数据)、读协程(用于读取响应)。
- 若 client.keepalive > 0,则开启心跳发送协程,每 keepalive/2 秒发送一次心跳。
- 将服务器状态变更为 running。
1 | // Start starts asynchronous goroutines |
Close 关闭客户端
关闭客户端时:
- 将客户端的状态变更为 closed,停止用于发送心跳的计时器。
- 关闭 Client.pendingReqs,阻止新的请求进入队列。
- 等待处理中的请求处理完成。
- 释放资源,包括关闭与服务器的连接(连接关闭后读协程会退出)、关闭等待响应的队列。
1 | // Close stops asynchronous goroutines and close connection |
请求与响应
客户端完成一个请求的完整流程为:
- 将请求发送到 Client.pendingReqs 通道中。
- 然后写协程会从 Client.pendingReqs 通道中取到请求发送给服务器,同时将请求发送到 Client.waitingReqs 后面等待读取响应。
- 最后读协程从 Client.waitingReqs 中取出请求,读取响应将结果。
Send 方法
Client.Send 方法用于发送命令,返回值为响应结果。其流程为:
- 首先填请求结构体 request。
- 接着将 request 挂在 client.pendingReqs 后面,等待写协程和读协程的依次处理。
- 等待 request 响应结果,或者请求超时。
- 若请求为 select 命令且未发生错误,会同步修改 client.curDBIdenx。
1 | // Send sends a request to redis server |
写协程
在启动客户端时,会再启动一个写协程,不断读取 client.pendingReqs 管道,用于向服务器发送请求。
1 | func (client *Client) handleWrite() { |
doRequest
真正的写逻辑在 doRequest 方法中。流程如下:
- doRequest 方法会尝试向服务器发送命令消息,若失败则最多重试三次。
1 | re := reply.MakeMultiBulkStringReply(req.args) |
- 若发送成功,则将 request 挂在 client.waitingReqs 管道后面,等待读协程去处理响应结果。
- 若发送失败,则结束流程,读协程不会处理。
1 | if err == nil { |
读协程
读协程不断接收服务器响应经过协议解析器解析之后的结果,若协议解析错误则会重连服务器 client.reconnect(),每收到一个(正常)响应就会调用 finishRequest 结束响应流程。
1 | func (client *Client) handleRead() { |
finishRequest
finishRequest 方法用于结束整个请求过程,流程如下:
- 首先会从 client.waitingReqs 管道中取出一个请求。
- 接着将服务器的响应结果填入 request.Reply。
- 最后调用 request.waiting.Done() 结束请求。