Dawn's Blogs

分享技术 记录成长

0%

Simple-Redis实现 (7) 底层数据库

本项目完整地址 simple-redis

底层数据库

在上一节中,我们简述了 simple-redis 的工作方式,需要注意的是如 GET、SET 这样需要在某个具体的数据库中执行的命令,单机模式下 Server 会调用 Server.db.Exec 去执行这类命令。

本节我们就聊一聊 simple-redis 的底层数据库,simple-redis 的底层数据库定义在 database/engine 文件夹中。

数据结构

DB

DB 定义在 database/engine/db.go 中。

DB 表示一个 simple-redis 底层数据库,就如同 Redis 中的一个数据库一样,不用的数据库之间用编号区分。simple-redis 的底层数据库定义如下:

  • index:数据库编号,是数据库的唯一标识。
  • data:是一个 dict.Dict 接口类型的属性,记录数据库中所有的数据。
  • ttlMap:用来记录所有 key 的过期时间。
  • versionMap:用来记录所有 key 的版本号,在事务中会用到。
  • locker:就是之前的 LockMap,用于一次性加锁,实现对数据的互斥访问。
  • addAof:用于 AOF 持久化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
const (
dataDictSize = 1 << 16
ttlDictSize = 1 << 10
lockSize = 1024
)

type DB struct {
index int // 数据库号
data dict.Dict
ttlMap dict.Dict
versionMap dict.Dict
locker *lock.Locks
addAof func(line CmdLine)
}

command

command 定义在 database/engine/router.go 中。

command 表示需要执行的命令,数据库中的所有操作都会被注册(RegisterCommand 方法)在 cmdTable 这个字典中,键为命令名称,值为 command 结构体。command 的结构如下:

  • executor:表示执行函数,在命令真正执行时会调用这个函数
  • prepare:在执行前被调用,用于解析出命令中需要加读锁和写锁的 keys
  • arity:记录合法的参数数量,如果大于 0,表示必须等于这个数;如果小于 0,则表示必须大于等于这个数的绝对值。
  • flags:记录这个命令是只读命令还是涉及到了写操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// ExecFunc is interface for command executor
// args don't include cmd line
type ExecFunc func(db *DB, args [][]byte) (redis.Reply, *AofExpireCtx)

// PreFunc returns related write keys and read keys
type PreFunc func(args [][]byte) ([]string, []string)

var cmdTable = make(map[string]*command)

type command struct {
executor ExecFunc
prepare PreFunc // return related keys command
arity int // allow number of args, arity < 0 means len(args) >= -arity
flags int // flagWrite or flagReadOnly
}

const (
FlagWrite = 0
FlagReadOnly = 1
)

AofExpireCtx

AofExpireCtx 记录在执行命令时,是否需要 AOF 持久化,是否有过期时间作为 command.executor 的返回值,它随后被用于 AOF 持久化中

1
2
3
4
5
// AofExpireCtx 记录在执行命令时,是否需要AOF持久化,是否有过期时间
type AofExpireCtx struct {
NeedAof bool
ExpireAt *time.Time
}

Exec 执行命令

DB 最重要的就是执行 Set、Get 等命令,DB.Exec 就是在本地执行命令的方法。

  • 在执行命令时,首先会检查客户端是否已经进入了 Multi,如果已经处于 Multi 模式:
    • 则首先会调用 DB.checkSyntaxErr 方法检查是否有语法错误(检查是否定义了命令、以及命令的参数数量是否正确),如果有语法错误则记录在客户端连接的 Multi 语法错误队列中,并结束执行。
    • 其次检查命令是否支持 Multi,如果不支持,则依然会记录在 Multi 语法错误队列中并结束执行。
    • 以上检查都没有错误,就会进入 Multi 命令队列,当用户发出 Exec 命令后依次执行队列中的任务。
  • 否则调用 DB.execNormalCommand 在本地正常的执行命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Exec executes command within one database
func (db *DB) Exec(c redis.Connection, cmdLine [][]byte) redis.Reply {
if c.GetMultiStatus() { // 如果客户端已经进入了multi
// 检查是否有语法错误
if errReply := db.CheckSyntaxErr(cmdLine); errReply != nil {
c.EnqueueSyntaxErrQueue(errReply) // 语法有错误
return errReply
}
// 获取命令,检查是否支持multi
if errReply := db.CheckSupportMulti(cmdLine); errReply != nil {
c.EnqueueSyntaxErrQueue(errReply) // 语法有错误
return errReply
}

// 语法没有错误,则进入队列等待执行
c.EnqueueCmdLine(cmdLine)

return reply.MakeStatusReply("QUEUED")
}

// 正常执行的命令
return db.execNormalCommand(cmdLine)
}

execNormalCommand 方法

对于非 Multi 模式下正常执行的命令,其流程如下:

  • 首先会检查是否有语法错误(检查是否定义了命令、以及命令的参数数量是否正确)。
  • 进行加锁,在数据库中执行,执行之后调用 DB.afterExec 进行持久化相关操作,需要注意的是,AofExpireCtx 表示的是 AOF 持久化上下文,它记录了该命令是否需要持久化、是否有过期时间。。
  • 如果命令是写操作,并且执行成功了,则会增加版本(版本用于实现 watch)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (db *DB) execNormalCommand(cmdLine [][]byte) redis.Reply {
if errReply := db.CheckSyntaxErr(cmdLine); errReply != nil {
// 检查是否有语法错误
return errReply
}
cmdName := strings.ToLower(string(cmdLine[0]))
// 获取命令
cmd, _ := cmdTable[cmdName]

// 执行前的加锁
prepare := cmd.prepare
write, read := prepare(cmdLine[1:])
db.RWLocks(write, read)
defer db.RWUnLocks(write, read)
// 执行
fun := cmd.executor
r, aofExpireCtx := fun(db, cmdLine[1:])
db.afterExec(r, aofExpireCtx, cmdLine)
// 写命令、执行成功增加版本
if !IsReadOnlyCommand(cmdName) && !reply.IsErrorReply(r) {
db.AddVersion(write...)
}

return r
}

CheckSyntaxErr 检查语法错误

DB.CheckSyntaxErr 方法用于检查语法错误,首先会在命令表 cmdTable 中查询命令是否存在,接着会调用 validateArity 检查命令的参数是否正确

1
2
3
4
5
6
7
8
9
10
11
12
13
func (db *DB) CheckSyntaxErr(cmdLine [][]byte) redis.Reply {
cmdName := strings.ToLower(string(cmdLine[0]))
// 获取命令
cmd, ok := cmdTable[cmdName]
if !ok {
return reply.MakeErrReply("ERR unknown command '" + cmdName + "'")
}
if !validateArity(cmd.arity, cmdLine) {
return reply.MakeArgNumErrReply(cmdName)
}

return nil
}

afterExec 命令执行后的相关处理

DB.afterExec 用于执行命令执行之后的相关处理,如持久化。需要注意的是,AofExpireCtx 表示的是 AOF 持久化上下文,它记录了该命令是否需要持久化、是否有过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
// afterExec 命令执行之后的相关处理,如持久化相关等
func (db *DB) afterExec(r redis.Reply, aofExpireCtx *AofExpireCtx, cmdLine [][]byte) {
key := string(cmdLine[1])
// 持久化相关
if aofExpireCtx != nil && aofExpireCtx.NeedAof {
// 需要进行AOF持久化
db.addAof(cmdLine)
if aofExpireCtx.ExpireAt != nil {
// 有过期时间
db.addAof(utils.ExpireToCmdLine(key, *aofExpireCtx.ExpireAt))
}
}
}

RegisterCommand 注册命令

在 router.go 中定义了一个全局变量 cmdTable 命令表,这个表是一个字典,其中键为命令名称,值为 command 结构体。RegisterCommand 函数就是用于将一条命令注册在 cmdTable 命令表中,只有被注册在这个命令表中才表示数据库中可以执行这一条命令。

1
2
3
4
5
6
7
8
9
func RegisterCommand(name string, executor ExecFunc, prepare PreFunc, arity int, flags int) {
name = strings.ToLower(name)
cmdTable[name] = &command{
executor: executor,
prepare: prepare,
arity: arity,
flags: flags,
}
}

所有的命令都定义在 database/commands 中,这个文件夹中的所有文件都定义了 init 初始化函数,函数的内容就是调用 RegisterCommand 去注册当前文件夹中定义的命令。在 Server中会 import 这个 commands 包,自动执行 init 初始化函数,自动注册其中定义的命令。