Dawn's Blogs

分享技术 记录成长

0%

Simple-Redis实现 (2) 客户端

本项目完整地址 simple-redis

客户端连接

在上一篇文章中,Handler 在 Handle 方法中,对于一个新的连接 conn 要初始化一个客户端与服务器连接的抽象 connect.Connection。这个结构体定义在 redis/connection/conn.go 中。

Connection 结构体

Connection 的结构体定义如下,有一些字段需要说明:

  • sendingData:用于优雅的结束连接。当结束连接时会等待,直到数据发送完成或者超时。
  • password:当配置了 password 后,当发送 auth 命令时就会修改这个字段,数据库服务器就会检查这个字段是否是正确的。
  • selectedDB:当客户端发送 select 命令时,就会修改此字段的值,用于控制当前客户端处于哪个数据库中。
  • isMulti,queue,syntaxErrQueue,watching,TxID:与事务相关的字段。
  • subscribeChannels:记录客户端订阅的频道。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Connection represents a connection with a client
type Connection struct {
conn net.Conn

// wait until finish sending data, used for graceful shutdown
sendingData wait.Wait

// lock while server sending response
mu sync.Mutex

// password may be changed by CONFIG command during runtime, so store the password
password string

// selected db
selectedDB int

isMulti bool // 表明是否在 multi 开启事务中
queue [][][]byte // 事务中排队的命令
syntaxErrQueue []redis.Reply // 事务中的语法错误
watching map[string]uint32 // 正在WATCH的key值
TxID string // 事务ID,在分布式事务中用到

subscribeChannels map[string]struct{} // 订阅的频道
}

sync.Pool 对象复用

在创建一个新的客户端连接抽象时,会复用之前已经结束的客户端连接。之前废弃的连接保存在 sync.Pool 对象资源池 connPool 中(这是一个全局变量):

1
2
3
4
5
var connPool = sync.Pool{
New: func() interface{} {
return &Connection{}
},
}

Write 写入数据

Write 方法用于写入数据,当写入时会 调用 c.sendingData.Add(1),结束时会调用 c.sendingData.Done()

1
2
3
4
5
6
7
8
9
10
11
12
13
// Write sends response to client over tcp connection
func (c *Connection) Write(bytes []byte) (int, error) {
if len(bytes) == 0 {
return 0, nil
}

c.sendingData.Add(1)
defer func() {
c.sendingData.Done()
}()

return c.conn.Write(bytes)
}

wait.Wait

sendingData 字段的类型为 wait.Wait,定义在 lib/sync/wait/wait.go 中。

wait.Wait 与 sync.GroupWait 的功能类似,只是在 sync.GroupWait 的基础上加入了超时的功能。不管是内部 wg 所有等待的事件结束(计数器归零)还是超时,都会停止等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Wait a sync.WaitGroup with timeout
type Wait struct {
wg sync.WaitGroup
}


func (w *Wait) WaitWithTimeout(timeout time.Duration) bool {
done := make(chan struct{}, 1)
go func() {
defer close(done)
w.Wait()
done <- struct{}{}
}()

select {
case <-done:
return false // finish normally
case <-time.After(timeout):
return true // timeout
}
}

Close 关闭连接

因为关闭连接时,需要等待将当前需要发送的数据发送完成后才可以关闭(优雅的关闭)。关闭流程如下:

  • 先调用 c.sendingData.WaitWithTimeout(10 * time.Second) 等待发送完成(或者超时)。
  • 重置内部的相关字段,然后调用 connPool.Put(c) 将对象放回对象池中,以便进行对象的复用

事务相关

Connection 结构体事务相关的方法都定义在 redis/connection/transaction.go 中。

isMulti,queue,syntaxErrQueue,watching,TxID 字段与事务相关。下面依次进行解释:

  • isMulti:当客户端执行 multi 命令时,isMulti 会被置为 true,当执行 discard 或者 exec 命令时 isMulti 会被置为 false。
  • queue:当 isMulti 为 true 时,客户端发送的所有命令都会被底层数据库服务器记录在 queue 命令队列中
  • syntaxErrQueue:底层服务器会在 multi 时,对命令进行语法错误检查,如果发现了语法错误,则记录在这个队列中。当这个队列不为空时,说明在入队时发现语法错误,整个队列中的命令都会放弃执行
  • watching:调用 watch 命令时,会将相关的 key 记录在这个 map 中。simple-redis 采用乐观锁的方式,其中键为被监视的 key,值为执行 wath 命令时 key 的版本号
  • TxID:在集群模式下,会开启分布式事务。在分布式事务中,分布式事务协调者会利用 snowflake 算法生成一个事务 ID,用于唯一的标识这个分布式事务。

订阅发布相关

Connection 结构体订阅发布相关的方法都定义在 redis/connection/publish.go 中。

subscribeChannels 代表客户端订阅的频道,它实际上是一个集合,它记录了这个客户端所有的订阅频道名字。

redis.Connection 接口

connect.Connect 结构体实现了 redis.Connection 接口,这个接口定义在 interface/redis/conn.go 文件中。定义 redis.Connection 接口的原因在于:

在 AOF 持久化中,为了重写 AOF 持久化文件,在临时数据库中重放文件中的命令时,因为通用性(在数据库中执行时需要传入一个 redis.Connection 接口类型的值),需要用到一个虚假连接(conection.FakeConn,定义在 redis/conection/fake_conn.go 中)。

pipeline 客户端

集群模式或者命令行模式下需要用到 simple-redis 客户端,与 simple-redis 服务器进行通信。

pipeline 模式

通常 TCP 客户端的通信模式都是阻塞式的:客户端发送请求 -> 等待服务端响应 -> 发送下一个请求。因为需要等待网络传输数据,完成一次请求循环需要等待较多时间。

针对这种效率低的情景,可以不等待服务端响应直接发送下一条请求

TCP 协议会保证数据流的有序性,同一个 TCP 连接上先发送的请求服务端先接收,先回复的响应客户端先收到。因此不必担心混淆响应所对应的请求。

这种在服务端未响应时客户端继续向服务端发送请求的模式称为 Pipeline 模式。因为减少等待网络传输的时间,Pipeline 模式可以极大的提高吞吐量。

Pipeline 模式的 Godis 客户端需要至少有两个后台协程,分别是发送请求协程(写协程)读取响应协程(读协程)。调用方通过 channel 向后台协程发送发送指令,并阻塞等待直到收到响应(或者超时)。

数据结构

Client

首先定义 Client 客户端,Client 客户端实现 pipeline 的核心在于两个通道

  • pendingReqs:记录等待发送的请求,客户端调用 Send 命令向客户端发送请求时,请求在这个通道内排队等待写协程发送请求
  • waitingReqs:记录等待服务器响应的请求,向服务器发送请求成功后将这个请求加入到这个通道中等待响应。当读协程收到一个服务器响应时就从通道中取出一个请求,此时一个完整的请求+响应完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Client struct {
conn net.Conn // 与服务器的tcp连接
pendingReqs chan *request // 等待发送的请求
waitingReqs chan *request // 等待服务器响应的请求
ticker *time.Ticker // 发送心跳的计时器
addr string

isCmdLine bool // 标记是否是命令行客户端
curDBIndex int // 当前数据库

status int32 // 服务器状态(创建/运行/关闭)
working *sync.WaitGroup

keepalive time.Duration // 服务器存活检查时间
}

request

客户端请求 request 的结构如下,它代表一个从请求到响应的完整的客户端请求。

  • args:客户端向服务器发送的命令行参数。
  • reply:收到的返回值。
1
2
3
4
5
6
7
8
type request struct {
id uint64 // 请求id
args [][]byte // 上行参数
reply redis.Reply // 收到的返回值
heartbeat bool // 标记是否是心跳请求
waiting *wait.Wait // 调用协程发送请求后通过 waitgroup 等待请求异步处理完成
err error
}

启动/关闭客户端

Start 启动客户端

Client.Start 用于开启客户端,主要工作:

  • 开启两个协程,分别是写协程(用于发送数据)、读协程(用于读取响应)。
  • 若 client.keepalive > 0,则开启心跳发送协程,每 keepalive/2 秒发送一次心跳。
  • 将服务器状态变更为 running
1
2
3
4
5
6
7
8
9
10
11
12
13
// Start starts asynchronous goroutines
func (client *Client) Start() {
go client.handleWrite()
go client.handleRead()

if client.keepalive > 0 {
// 开启心跳
client.ticker = time.NewTicker(time.Second * client.keepalive / 2) // 每 keepalive/2 秒发送一次心跳
go client.heartbeat()
}

atomic.StoreInt32(&client.status, running)
}

Close 关闭客户端

关闭客户端时:

  • 将客户端的状态变更为 closed,停止用于发送心跳的计时器。
  • 关闭 Client.pendingReqs,阻止新的请求进入队列
  • 等待处理中的请求处理完成
  • 释放资源,包括关闭与服务器的连接(连接关闭后读协程会退出)、关闭等待响应的队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Close stops asynchronous goroutines and close connection
func (client *Client) Close() {
atomic.StoreInt32(&client.status, closed)
if client.keepalive > 0 {
client.ticker.Stop()
}
// stop new request
close(client.pendingReqs)

// wait working process stop
client.working.Wait()
// client.heartbeatWorking.Wait()

_ = client.conn.Close()
close(client.waitingReqs)
}

请求与响应

客户端完成一个请求的完整流程为:

  • 将请求发送到 Client.pendingReqs 通道中
  • 然后写协程会从 Client.pendingReqs 通道中取到请求发送给服务器,同时将请求发送到 Client.waitingReqs 后面等待读取响应。
  • 最后读协程从 Client.waitingReqs 中取出请求,读取响应将结果

Send 方法

Client.Send 方法用于发送命令,返回值为响应结果。其流程为:

  • 首先填请求结构体 request。
  • 接着将 request 挂在 client.pendingReqs 后面,等待写协程和读协程的依次处理。
  • 等待 request 响应结果,或者请求超时。
  • 若请求为 select 命令且未发生错误,会同步修改 client.curDBIdenx。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Send sends a request to redis server
func (client *Client) Send(args [][]byte) redis.Reply {
if atomic.LoadInt32(&client.status) != running {
return reply.MakeErrReply("client closed")
}

request := &request{
args: args,
heartbeat: false,
waiting: &wait.Wait{},
}

request.waiting.Add(1)
client.working.Add(1)
defer client.working.Done()

client.pendingReqs <- request

timeout := request.waiting.WaitWithTimeout(maxWait)
if timeout {
return reply.MakeErrReply("server time out")
}
if request.err != nil {
return reply.MakeErrReply("request failed")
}

if _, ok := request.reply.(*reply.StandardErrReply); !ok && strings.ToLower(string(args[0])) == "select" {
curDBIndex, _ := strconv.Atoi(string(args[1]))
client.curDBIndex = curDBIndex
}

return request.reply
}

写协程

在启动客户端时,会再启动一个写协程,不断读取 client.pendingReqs 管道,用于向服务器发送请求

1
2
3
4
5
func (client *Client) handleWrite() {
for req := range client.pendingReqs {
client.doRequest(req)
}
}

doRequest

真正的写逻辑在 doRequest 方法中。流程如下:

  • doRequest 方法会尝试向服务器发送命令消息,若失败则最多重试三次
1
2
3
4
5
6
7
8
9
10
11
12
re := reply.MakeMultiBulkStringReply(req.args)
bytes := re.ToBytes()

// 最多失败重试3次
var err error
for i := 0; i < 3; i++ {
_, err = client.conn.Write(bytes)
if err == nil || (!strings.Contains(err.Error(), "timeout") && // only retry timeout
!strings.Contains(err.Error(), "deadline exceeded")) {
break
}
}
  • 若发送成功,则将 request 挂在 client.waitingReqs 管道后面,等待读协程去处理响应结果。
  • 若发送失败,则结束流程,读协程不会处理。
1
2
3
4
5
6
if err == nil {
client.waitingReqs <- req
} else {
req.err = err
req.waiting.Done()
}

读协程

读协程不断接收服务器响应经过协议解析器解析之后的结果,若协议解析错误则会重连服务器 client.reconnect(),每收到一个(正常)响应就会调用 finishRequest 结束响应流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (client *Client) handleRead() {
ch := parser.ParseStream(client.conn)
for payload := range ch {
if payload.Err != nil {
status := atomic.LoadInt32(&client.status)
if status == closed {
return
}
client.reconnect()
return
}
client.finishRequest(payload.Data)
}
}

finishRequest

finishRequest 方法用于结束整个请求过程,流程如下:

  • 首先会从 client.waitingReqs 管道中取出一个请求。
  • 接着将服务器的响应结果填入 request.Reply。
  • 最后调用 request.waiting.Done() 结束请求。