Dawn's Blogs

分享技术 记录成长

0%

godis源码阅读 (2) Redis协议解析器

Redis 通信协议

Redis 自 2.0 版本起使用了统一的协议 RESP(REdis Serialization Protocol,Redis 序列化协议),该协议易于实现,计算机可以高效的进行解析且易于被人类读懂。

RESP 是一个二进制安全的文本协议,工作于 TCP 协议上。RESP 以作为单位,客户端和服务器发送的命令或数据一律以 \r\n (CRLF)作为换行符。

RESP 的五种格式

RESP 定义了 5 种格式:

  • 简单字符串(Simple String):服务器用来返回简单的结果,比如 OK。非二进制安全,不允许换行。
  • 错误信息(Error):服务器用来返回简单的错误信息,比如 ERR Invalid Synatx。非二进制安全,且不允许换行。
  • 整数(Integer):llen、scard 等命令的返回值,64位有符号整数。
  • 字符串(Bulk String):二进制安全字符串。
  • 数组(Array,又称 Multi Bulk Strings):Bulk String 数组,客户端发送指令以及 lrange 等命令响应的格式。

RESP 通过第一个字符表示格式:

  • 简单字符串:以 + 开始,如 +OK\r\n。
  • 错误:以 - 开始,如 -ERR Invalid Syntax\r\n。
  • 整数:以 : 开始,如 :1\r\n。
  • 字符串:以 $ 开始。Bulk String 有两行,第一行为 $+正文长度,第二行为实际内容。$-1 表示 nil,当使用 get 查询一个不存在的 key 时,响应为 nil。
  • 数组:以 * 开始。第一行为 *+数组长度,其后是相应数量的 Bulk String。

协议解析器

1
2
3
4
5
6
// ParseStream reads data from io.Reader and send payloads through channel
func ParseStream(reader io.Reader) <-chan *Payload {
ch := make(chan *Payload)
go parse0(reader, ch)
return ch
}

Payload

Payload 用于存储 Reply 和 错误信息 error。

1
2
3
4
5
// Payload stores redis.Reply or error
type Payload struct {
Data redis.Reply
Err error
}

Reply 实现 RESP 消息的接口

Reply 是实现 RESP 消息的接口。

1
2
3
4
// Reply is the interface of redis serialization protocol message
type Reply interface {
ToBytes() []byte
}

以 Bulk String 为例,进行说明。

BulkReply 是 BulkString 消息的 Reply 接口实现,ToBytes 函数封装 $+正文长度+CRLF+正文+CRLF,形成 RESP 消息体。

1
2
3
4
5
6
7
8
9
10
11
12
// BulkReply stores a binary-safe string
type BulkReply struct {
Arg []byte
}

// ToBytes marshal redis.Reply
func (r *BulkReply) ToBytes() []byte {
if r.Arg == nil {
return nullBulkBytes
}
return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)
}

parse0 协议解析

parse0 用于协议解析,将解析的结果 Payload 发送到通道中。

1
func parse0(rawReader io.Reader, ch chan<- *Payload)

流程如下:

  • 首先初始化一个带有缓冲区的 reader,接着开启一个循环,不断读取行进行解析。
1
2
3
4
reader := bufio.NewReader(rawReader)
for {
...
}
  • 在循环中,首先读取一行,若这一行的长度小于等于2且倒数第二个字符不是 \r忽略空行,且检查结尾是不是 CRLF)则跳过。
1
2
3
4
5
6
7
8
9
10
11
12
line, err := reader.ReadBytes('\n')
if err != nil {
ch <- &Payload{Err: err}
close(ch)
return
}
length := len(line)
if length <= 2 || line[length-2] != '\r' {
// there are some empty lines within replication traffic, ignore this error
//protocolError(ch, "empty line")
continue
}
  • 检查行的第一个字符

    1
    2
    3
    4
    line = bytes.TrimSuffix(line, []byte{'\r', '\n'})
    switch line[0] {
    ...
    }
    • **若是 +**,代表这是一个简单字符串,封装为 Payload 发往通道中。若内容中含有 FULLRESYNC,则代表这是一条 RDB 同步消息,调用 parseRDBBulkString 进行解析(因为 RDB 消息中,字符串正文内容没有 CRLF 结尾,所以需要特殊处理)。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    case '+':
    content := string(line[1:])
    ch <- &Payload{
    Data: protocol.MakeStatusReply(content),
    }
    if strings.HasPrefix(content, "FULLRESYNC") {
    err = parseRDBBulkString(reader, ch)
    if err != nil {
    ch <- &Payload{Err: err}
    close(ch)
    return
    }
    }
    • **若是 -**,代表这是错误信息,封装为 Payload 发往通道中。
    • **若是 :**,代表是个数字信息,将字符串解析为数字并封装为 Payload 发往通道中。
    • **若是 $**,代表这是字符串,调用 parseBulkString 解析字符串。
    1
    2
    3
    4
    5
    6
    7
    case '$':
    err = parseBulkString(line, reader, ch)
    if err != nil {
    ch <- &Payload{Err: err}
    close(ch)
    return
    }
    • **若是 ***,代表这是数组,调用 parseArray 解析数组。
    1
    2
    3
    4
    5
    6
    7
    case '*':
    err = parseArray(line, reader, ch)
    if err != nil {
    ch <- &Payload{Err: err}
    close(ch)
    return
    }
    • 默认解析为字符串数组。
    1
    2
    3
    4
    5
    default:
    args := bytes.Split(line, []byte{' '})
    ch <- &Payload{
    Data: protocol.MakeMultiBulkReply(args),
    }

parseBulkString 解析字符串

parseBulkString 用于解析字符串

1
func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error
  • 首先读取字符串长度。
1
2
3
4
5
6
7
8
9
10
strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)
if err != nil || strLen < -1 {
protocolError(ch, "illegal bulk string header: "+string(header))
return nil
} else if strLen == -1 {
ch <- &Payload{
Data: protocol.MakeNullBulkReply(),
}
return nil
}
  • 接着利用 io.ReadFull 读取指定长度字符串正文内容,并且封装为 Payload,发送到通道中。
1
2
3
4
5
6
7
8
9
body := make([]byte, strLen+2)
_, err = io.ReadFull(reader, body)
if err != nil {
return err
}
ch <- &Payload{
Data: protocol.MakeBulkReply(body[:len(body)-2]),
}
return nil

parseArray 解析数组

parseArray 用于解析数组。

1
func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error
  • 首先解析数组长度。
1
2
3
4
5
6
7
8
9
10
nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64)
if err != nil || nStrs < 0 {
protocolError(ch, "illegal array header "+string(header[1:]))
return nil
} else if nStrs == 0 {
ch <- &Payload{
Data: protocol.MakeEmptyMultiBulkReply(),
}
return nil
}
  • 接着解析数组中的每一个字符串,封装为 Payload,发送到通道中。
    • 对于每一个字符串,首先读取字符串的第一行,获取字符串长度(-1 代表空)。
    • 接着利用 io.ReadFull 读取指定长度字符串正文内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
lines := make([][]byte, 0, nStrs)
for i := int64(0); i < nStrs; i++ {
var line []byte
line, err = reader.ReadBytes('\n')
if err != nil {
return err
}
length := len(line)
if length < 4 || line[length-2] != '\r' || line[0] != '$' {
protocolError(ch, "illegal bulk string header "+string(line))
break
}
strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64)
if err != nil || strLen < -1 {
protocolError(ch, "illegal bulk string length "+string(line))
break
} else if strLen == -1 {
lines = append(lines, []byte{})
} else {
body := make([]byte, strLen+2)
_, err := io.ReadFull(reader, body)
if err != nil {
return err
}
lines = append(lines, body[:len(body)-2])
}
}
ch <- &Payload{
Data: protocol.MakeMultiBulkReply(lines),
}
return nil