Redis 通信协议
Redis 自 2.0 版本起使用了统一的协议 RESP(REdis Serialization Protocol,Redis 序列化协议),该协议易于实现,计算机可以高效的进行解析且易于被人类读懂。
RESP 是一个二进制安全的文本协议,工作于 TCP 协议上。RESP 以行作为单位,客户端和服务器发送的命令或数据一律以 \r\n (CRLF)作为换行符。
RESP 的五种格式
RESP 定义了 5 种格式:
- 简单字符串(Simple String):服务器用来返回简单的结果,比如 OK。非二进制安全,不允许换行。
- 错误信息(Error):服务器用来返回简单的错误信息,比如 ERR Invalid Synatx。非二进制安全,且不允许换行。
- 整数(Integer):llen、scard 等命令的返回值,64位有符号整数。
- 字符串(Bulk String):二进制安全字符串。
- 数组(Array,又称 Multi Bulk Strings):Bulk String 数组,客户端发送指令以及 lrange 等命令响应的格式。
RESP 通过第一个字符表示格式:
- 简单字符串:以 + 开始,如 +OK\r\n。
- 错误:以 - 开始,如 -ERR Invalid Syntax\r\n。
- 整数:以 : 开始,如 :1\r\n。
- 字符串:以 $ 开始。Bulk String 有两行,第一行为 $+正文长度,第二行为实际内容。$-1 表示 nil,当使用 get 查询一个不存在的 key 时,响应为 nil。
- 数组:以 * 开始。第一行为 *+数组长度,其后是相应数量的 Bulk String。
协议解析器
1 | // ParseStream reads data from io.Reader and send payloads through channel |
Payload
Payload 用于存储 Reply 和 错误信息 error。
1 | // Payload stores redis.Reply or error |
Reply 实现 RESP 消息的接口
Reply 是实现 RESP 消息的接口。
1 | // Reply is the interface of redis serialization protocol message |
以 Bulk String 为例,进行说明。
BulkReply 是 BulkString 消息的 Reply 接口实现,ToBytes 函数封装 $+正文长度+CRLF+正文+CRLF,形成 RESP 消息体。
1 | // BulkReply stores a binary-safe string |
parse0 协议解析
parse0 用于协议解析,将解析的结果 Payload 发送到通道中。
1 | func parse0(rawReader io.Reader, ch chan<- *Payload) |
流程如下:
- 首先初始化一个带有缓冲区的 reader,接着开启一个循环,不断读取行进行解析。
1 | reader := bufio.NewReader(rawReader) |
- 在循环中,首先读取一行,若这一行的长度小于等于2且倒数第二个字符不是
\r
(忽略空行,且检查结尾是不是 CRLF)则跳过。
1 | line, err := reader.ReadBytes('\n') |
检查行的第一个字符:
1
2
3
4line = bytes.TrimSuffix(line, []byte{'\r', '\n'})
switch line[0] {
...
}- **若是 +**,代表这是一个简单字符串,封装为 Payload 发往通道中。若内容中含有 FULLRESYNC,则代表这是一条 RDB 同步消息,调用 parseRDBBulkString 进行解析(因为 RDB 消息中,字符串正文内容没有 CRLF 结尾,所以需要特殊处理)。
1
2
3
4
5
6
7
8
9
10
11
12
13case '+':
content := string(line[1:])
ch <- &Payload{
Data: protocol.MakeStatusReply(content),
}
if strings.HasPrefix(content, "FULLRESYNC") {
err = parseRDBBulkString(reader, ch)
if err != nil {
ch <- &Payload{Err: err}
close(ch)
return
}
}- **若是 -**,代表这是错误信息,封装为 Payload 发往通道中。
- **若是 :**,代表是个数字信息,将字符串解析为数字并封装为 Payload 发往通道中。
- **若是 $**,代表这是字符串,调用 parseBulkString 解析字符串。
1
2
3
4
5
6
7case '$':
err = parseBulkString(line, reader, ch)
if err != nil {
ch <- &Payload{Err: err}
close(ch)
return
}- **若是 ***,代表这是数组,调用 parseArray 解析数组。
1
2
3
4
5
6
7case '*':
err = parseArray(line, reader, ch)
if err != nil {
ch <- &Payload{Err: err}
close(ch)
return
}- 默认解析为字符串数组。
1
2
3
4
5default:
args := bytes.Split(line, []byte{' '})
ch <- &Payload{
Data: protocol.MakeMultiBulkReply(args),
}
parseBulkString 解析字符串
parseBulkString 用于解析字符串。
1 | func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error |
- 首先读取字符串长度。
1 | strLen, err := strconv.ParseInt(string(header[1:]), 10, 64) |
- 接着利用 io.ReadFull 读取指定长度字符串正文内容,并且封装为 Payload,发送到通道中。
1 | body := make([]byte, strLen+2) |
parseArray 解析数组
parseArray 用于解析数组。
1 | func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error |
- 首先解析数组长度。
1 | nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64) |
- 接着解析数组中的每一个字符串,封装为 Payload,发送到通道中。
- 对于每一个字符串,首先读取字符串的第一行,获取字符串长度(-1 代表空)。
- 接着利用 io.ReadFull 读取指定长度字符串正文内容。
1 | lines := make([][]byte, 0, nStrs) |