Dawn's Blogs

分享技术 记录成长

0%

Simple-Redis实现 (3) 协议解析器

本项目完整地址 simple-redis

RESP 协议

simple-redis 的通信使用 RESP 协议,Redis 自 2.0 版本起使用了统一的协议 RESP(REdis Serialization Protocol,Redis 序列化协议),该协议易于实现,计算机可以高效的进行解析且易于被人类读懂。

RESP 是一个二进制安全的文本协议,工作于 TCP 协议上。RESP 以作为单位,客户端和服务器发送的命令或数据一律以 \r\n (CRLF)作为换行符。

RESP 的五种格式

RESP 定义了 5 种格式:

  • 简单字符串(Simple String):服务器用来返回简单的结果,比如 OK。非二进制安全,不允许换行。
  • 错误信息(Error):服务器用来返回简单的错误信息,比如 ERR Invalid Synatx。非二进制安全,且不允许换行。
  • 整数(Integer):llen、scard 等命令的返回值,64位有符号整数。
  • 字符串(Bulk String):二进制安全字符串。
  • 数组(Array,又称 Multi Bulk Strings):Bulk String 数组,客户端发送指令以及 lrange 等命令响应的格式。

RESP 通过第一个字符表示格式:

  • 简单字符串:以 + 开始,如 +OK\r\n。
  • 错误:以 - 开始,如 -ERR Invalid Syntax\r\n。
  • 整数:以 : 开始,如 :1\r\n。
  • 字符串:以 $ 开始。Bulk String 有两行,第一行为 $+正文长度,第二行为实际内容。$-1 表示 nil,当使用 get 查询一个不存在的 key 时,响应为 nil。
  • 数组:以 * 开始。第一行为 *+数组长度,其后是相应数量的 Bulk String。

协议解析器

simple-redis 的协议解析器在 redis/parser/parser.go 中实现。

在前面的章节中,不管是服务器端还是客户端,都用到了协议解析器解析 RESP 协议。

调用的方法为 ParseStream,返回一个管道,管道内存放着解析完成的 Payload 以供读取。

1
2
3
4
5
func ParseStream(reader io.Reader) <-chan *Payload {
ch := make(chan *Payload)
go parser(reader, ch)
return ch
}

Payload 和 redis.Reply

Payload

Payload 表示协议解析器解析之后的结果:

  • Data 存放解析结果。
  • Err 存放协议解析时的错误。
1
2
3
4
type Payload struct {
Data redis.Reply
Err error
}

服务器端遇到解析协议错误(Payload.Err != nil)时,会关闭客户端的连接。而当客户端遇到协议解析错误时,会尝试和服务器进行重连

redis.Reply

redis.Reply 是一个接口,定义在 interface/redis/reply.go 文件中,它表示一个 RESP 协议数据的正常解析结果。这个接口有两个方法:

  • ToBytes 方法返回的是在传输过程中转换为 RESP 协议数据的字节切片。
  • DataString 方法返回的是这一条数据在命令行中的显示结果。
1
2
3
4
5
// Reply 表示 redis 序列化协议中的一条消息
type Reply interface {
ToBytes() []byte
DataString() string
}

因为 RESP 有五种格式,分别是简单字符串、错误信息、整数、字符串、数组,所以至少有五个结构体代表上述不同的格式实现了 redis.Reply 接口

redis.Reply 接口的实现在 redis/protocol/reply 文件夹内,其中:

  • StatusReply 代表了简单字符串,也就是状态信息。
  • ErrorReply 代表了错误信息。
  • IntReply 代表了整数。
  • BulkStringReply 代表了整数。
  • MultiBulkStringReply 代表了数组。

parser 协议解析

协议解析真正的实现在 parser 函数中:协议解析器不断读取传输的数据,遇到 \r\n 时检查前面的数据:

  • 若数据以 + 开头,说明这是一个简单字符串
  • 若数据以 : 开头,说明这是一个整数,若从字节切片中解析整数错误,则会发生协议解析错误,填写 Payload.Err。
  • 若数据以 - 开头,说明这个一条错误消息
  • 若数据以 $ 开头,说明这是一个字符串,调用 parseBulkString 进行解析字符串。
    • 字符串在 $ 后面会紧跟着字符串的长度,若长度不能解析为一个整数则发生协议解析错误。
    • 再从数据中读取相应长度的字符串。
  • 若数据以 * 开头,说明这是一个数组,数组中的每一项都是一个字符串,调用 parseArray 方法进行解析数组。
    • 数组在 * 后面会紧跟着数组的长度,若长度不能解析为一个整数则发生协议解析错误。
    • 接着开始逐个解析每一个字符串(步骤同上)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func parser(rawReader io.Reader, ch chan<- *Payload) {
defer func() {
if err := recover(); err != nil {
logger.Error(err)
}
}()

reader := bufio.NewReader(rawReader)
for {
line, err := reader.ReadBytes('\n')
if err != nil {
ch <- &Payload{Err: err}
close(ch)
return
}
length := len(line)
if length <= 2 || line[length-2] != '\r' {
// 检查格式,必须以 \r\n 结尾
continue
}
line = bytes.TrimSuffix(line, []byte{'\r', '\n'}) // 去除结尾的 \r\n
switch line[0] {
case '+':
content := string(line[1:])
ch <- &Payload{
Data: reply.MakeStatusReply(content),
}
case ':':
value, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
protocolError(ch, "illegal number "+string(line[1:]))
continue
}
ch <- &Payload{
Data: reply.MakeIntReply(value),
}
case '-':
content := string(line[1:])
ch <- &Payload{
Data: reply.MakeErrReply(content),
}
case '$':
err = parseBulkString(line, reader, ch)
if err != nil {
ch <- &Payload{Err: err}
close(ch)
return
}
case '*':
err = parseArray(line, reader, ch)
if err != nil {
ch <- &Payload{Err: err}
close(ch)
return
}
}
}
}