Dawn's Blogs

分享技术 记录成长

0%

本项目地址:LMQ

LMQ 是一个仿照 NSQ 的基于 TCP 协议的分布式消息队列,本系列说明 LMQ 的实现方式。

在本节说明 TCP 服务器的实现,TCP 服务器的实现地址:Hamble TCP Server

TCP 服务器架构

TCP 服务器的架构如下,总体上采用 1+M+N 的服务器编程模型

  • 一个协程(Server goroutine)用于建立连接(Accept)。
  • M 个协程用于读写取连接请求信息。
  • N 个协程用于处理请求内容,并生成响应信息。

image-20231218132654361

阅读全文 »

本项目完整地址 simple-redis

总结

功能实现

Simple-Redis 是一个 golang 编写的实现了 RESP(REdis Serialization Protocol)协议的简易 Redis,实现的功能包括:

  • 五大基本数据类型(string、set、hash、list、zset)及其操作。
  • 通信协议为 RESP 协议,与 Redis-cli 兼容。
  • 增加并发性,可以并发执行命令。
  • AOF 持久化功能,包括 AOF 持久化和 AOF 重写。
  • 分片集群功能,以一致性哈希实现。
  • 发布订阅功能,更支持集群下的发布订阅模式。
  • 支持原子事务,在执行错误时进行回滚。
  • 支持分布式事务,支持在多集群下的 TCC 分布式事务。

优势

相比于 Redis,Simple-Redis 的优势在于:

  • 与 Redis 在执行命令时单线程不同,Simple-Redis 为每一个客户端并发的执行命令
  • pipeline 模式客户端,无需等待响应即可发送下一条命令。
  • TTL 过期使用时间轮算法实现,时间轮算法保证了不会扫描全部的 key(节省 CPU),又能够及时的删除过期数据(节省 内存)
  • 集群采用一致性哈希实现,使 key 更加均匀的分布。
  • 集群间的通信采用连接池的方式,优势在于:
    • 复用连接(复用空闲连接)。
    • 增加并发性能(节点之间有多个连接)。
    • 控制节点之间的连接数,不会因为节点之间的通信而影响客户端与节点之间的通信(有最大空闲连接数和最大活跃连接数的限制)。
  • 支持集群下的发布订阅模式,为了支持这一功能,定义了 publish 和 publishsingle 命令。
    • publish 命令:客户端向集群中的一个节点发送 publish 命令,节点会向所有的 peers 转发这条发布消息
    • publishsingle 命令:节点向所有 peers 转发发布的消息时,使用的是 publishsingle 命令,表明节点在收到消息后不用继续转发了(避免了节点转发再转发,造成的无限消息洪泛)。
  • 支持原子事务,在执行错误时进行回滚。
  • 支持分布式事务,支持在多集群下的 TCC 分布式事务。以当前节点为事务协调者,控制分布在其他节点的本地事务。一次分布式事务的流程如下:
    • 对命令和 watch 的 key,按照节点的不同进行分组
    • 对每一组节点发送 try 命令,try 命令比较复杂又分为几个子命令:
      • try start:通知节点有新建一个本地分布式事务。
      • try watched:发送 wath 时的 version,比较 version 是否发生变化。
      • try xxx:依次发送在该节点上执行的命令。
      • try end:通知 try 阶段已经结束,本地事务检查版本是否变化、锁定相关的 key。
    • 对每一组节点发送 commit 或者 cancel 命令
    • 重组事务的返回结果,按照正常的顺序进行返回。

性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
zhaohan08@MacBook-Pro redis % redis-benchmark -t set,get,incr,lpush,rpush,lpop,rpop,sadd,hset,spop,zadd,lrange -n 10000 -q
WARNING: Could not fetch server CONFIG
SET: 68965.52 requests per second, p50=0.367 msec
GET: 62111.80 requests per second, p50=0.399 msec
INCR: 49751.24 requests per second, p50=0.487 msec
LPUSH: 56497.18 requests per second, p50=0.447 msec
RPUSH: 49504.95 requests per second, p50=0.503 msec
LPOP: 68965.52 requests per second, p50=0.375 msec
RPOP: 68493.15 requests per second, p50=0.375 msec
SADD: 52631.58 requests per second, p50=0.495 msec
HSET: 68027.21 requests per second, p50=0.375 msec
SPOP: 68027.21 requests per second, p50=0.383 msec
ZADD: 49751.24 requests per second, p50=0.503 msec
LPUSH (needed to benchmark LRANGE): 56818.18 requests per second, p50=0.463 msec
LRANGE_100 (first 100 elements): 25773.20 requests per second, p50=0.895 msec
LRANGE_300 (first 300 elements): 13586.96 requests per second, p50=1.775 msec
LRANGE_500 (first 500 elements): 9041.59 requests per second, p50=2.607 msec
LRANGE_600 (first 600 elements): 7704.16 requests per second, p50=3.015 msec

在单机时,可以得出结论,Simple-Redis 和 Redis 是有性能差距的(达到 Redis 的 60-70%),具体数据如下:

  • string 类型:Simple-Redis 的性能大约是 Redis 的 70%。
  • list 类型:Simple-Redis 的性能大约是 Redis 的 50%。
  • hash 类型:Simple-Redis 大约是 Redis 的 75%。
  • set 类型:Simple-Redis 的性能大约是 Redis 的 55%。
  • zset 类型:Simple-Redis 的性能大约是 Redis 的 50%。

本项目完整地址 simple-redis

Simple-Redis 是支持分布式事务的,采用 TCC(Try-Commit-Cancel)实现分布式事务,以当前节点为事务协调者,控制分布在其他节点的本地事务。所以 Simple-Redis 的分布式事务实现包括两个部分:

  • 事务协调者:一个分布式事务的控制器。
  • 本地事务:用于执行命令,并进行 commit 或者 cancel。

事务协调者

定义

TCC 事务协调者定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Coordinator TCC事务协调者
type Coordinator struct {
id string // 事务id
dbIndex int // 数据库id
self string // 自己的地址
peers cluster.PeerPicker // 一致性哈希,用于选择节点
getters map[string]cluster.PeerGetter // 用于和远程节点通信

watching map[string]map[string]uint32 // key为peer地址,记录被watch的key的版本号

cmdLinesNum int
indexMap map[string][]int // key为peer,value为cmdline在原来multi队列中的下标,用于重组reply
}

流程

一次分布式事务的流程如下:

  • 对命令和 watch 的 key,按照节点的不同进行分组
  • 对每一组节点发送 try 命令,try 命令比较复杂又分为几个子命令:
    • try start:通知节点有新建一个本地分布式事务。
    • try watched:发送 wath 时的 version,比较 version 是否发生变化。
    • try xxx:依次发送在该节点上执行的命令。
    • try end:通知 try 阶段已经结束,本地事务检查版本是否变化、锁定相关的 key。
  • 对每一组节点发送 commit 或者 cancel 命令
  • 重组事务的返回结果,按照正常的顺序进行返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// ExecTx 执行TCC分布式事务
func (coordinator *Coordinator) ExecTx(cmdLines [][][]byte, watching map[string]uint32) redis.Reply {
// 首先对命令进行分组,以同一个节点上执行的所有命令为一组
groupByMap := coordinator.groupByCmdLines(cmdLines)

// 对watch的key进行分组
coordinator.groupByWatch(watching)

// 对每一组中发送try命令
needCancel := false // 记录是否需要回滚
for peer, cmd := range groupByMap {
r := coordinator.sendTry(peer, cmd)
if reply.IsErrorReply(r) {
needCancel = true
break
}
}

defer func() {
// 返回之前发送end消息,通知各个分布式节点结束事务
for peer := range groupByMap {
coordinator.sendEnd(peer)
}
}()

// 向各个节点提交/回滚分布式事务
replies := make(map[string]redis.Reply)
for peer := range groupByMap {
if needCancel {
// 取消事务
coordinator.sendCancel(peer)
} else {
// 提交事务
r := coordinator.sendCommit(peer)
if config.Properties.OpenAtomicTx && reply.IsErrorReply(r) {
// 如果开启了原子事务并且提交发生了错误,则进行所有的节点进行回滚
needCancel = true
break
}
replies[peer] = r
}
}

// 如果开启了原子事务,并且发生了错误,所有节点进行回滚
if needCancel && config.Properties.OpenAtomicTx {
for peer := range groupByMap {
// 取消事务,进行回滚
coordinator.sendCancel(peer)
}

return reply.MakeErrReply("EXECABORT Transaction rollback because of errors during executing. (atomic tx is open)")
}

if needCancel {
// 没有开启原子事务,但是发送了错误(如watch的key变化了),则返回(nil)
return reply.MakeNullBulkStringReply()
}

// 正常提交,重组reply
return coordinator.recombineReplies(replies)
}

try

try 命令比较复杂,分为几个子命令:

  • try start:通知节点有新建一个本地分布式事务。
  • try watched:发送 wath 时的 version,比较 version 是否发生变化。
  • try xxx:依次发送在该节点上执行的命令。
  • try end:通知 try 阶段已经结束,本地事务检查版本是否变化、锁定相关的 key。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (coordinator *Coordinator) sendTry(peer string, cmdLines [][][]byte) redis.Reply {
// 获取getter
getter := coordinator.getters[peer]

var r redis.Reply
// 发送try开始命令
r = getter.RemoteExec(coordinator.dbIndex, utils.StringsToCmdLine("try", coordinator.id, "start"))
if reply.IsErrorReply(r) {
// 如果发生错误,则中断try,直接返回
return r
}

// 发送watch时的version,比较version是否发生变化
watching := coordinator.watching[peer]
for key, version := range watching {
versionStr := strconv.FormatInt(int64(version), 10)
r = getter.RemoteExec(coordinator.dbIndex, utils.StringsToCmdLine("try", coordinator.id, "watched", key, versionStr))
if reply.IsErrorReply(r) {
// 如果发生变化,则中断try,直接返回
return r
}
}

// 依次发送需要执行的命令,每一条命令=try tx_id cmdline,如 try 123456 set k1 v1
for _, cmdLine := range cmdLines {
tryCmd := make([][]byte, 0, 2+len(cmdLine))
tryCmd = append(tryCmd, utils.StringsToCmdLine("try", coordinator.id)...)
tryCmd = append(tryCmd, cmdLine...)
r = getter.RemoteExec(coordinator.dbIndex, tryCmd)
if reply.IsErrorReply(r) {
// 如果发生错误,则中断try,直接返回
return r
}
}

// 发送try结束命令
r = getter.RemoteExec(coordinator.dbIndex, utils.StringsToCmdLine("try", coordinator.id, "end"))
if reply.IsErrorReply(r) {
// 如果发生错误,则中断try,直接返回
return r
}

return reply.MakeOkReply()
}

本地事务

本地事务就是分布式事务在节点上的实际执行者,下面是本地事务的定义:

  • watchedKeys 对应 try watched 命令,记录 watch 时的版本,用于在执行时进行版本比较。
  • cmdLines 对应 try xxx 命令,记录当前节点的本地事务中,包含的所有需要执行的命令。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Transaction TCC本地事务
type Transaction struct {
id string // 事务id
phase int // 阶段,分别是try、commit、cancel三个阶段

writeKeys []string
readKeys []string
watchedKeys map[string]uint32 // 记录watch时的版本

cmdLines [][][]byte // 本次事务中包含的命令
undoLogs [][][][]byte // 本次事务的undo log
addVersionKeys []string // 需要增加版本的key

db *engine.DB

mu sync.Mutex
}

Try

在收到 try end 命令后,表明 try 阶段已经结束,此时本地事务:

  • 锁定需要读写的 key。
  • 在在时间轮中添加任务, 自动回滚超时未提交的事务。
  • 读取版本是否变化,如果有变化则返回错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Try TCC事务的结束try阶段,锁定key
func (tx *Transaction) Try() redis.Reply {
if tx.phase != phaseTry {
return reply.MakeErrReply("ERR TRY ERROR")
}

// 锁定需要读写的key
tx.db.RWLocks(tx.writeKeys, tx.readKeys)

// 在时间轮中添加任务, 自动回滚超时未提交的事务
taskKey := tx.id
timewheel.Delay(maxLockTime, taskKey, func() {
if tx.phase == phaseTry {
logger.Info("abort transaction: " + tx.id)
_ = tx.Cancel()
}
})

// 读取版本是否变化
for key, oldVersion := range tx.watchedKeys {
newVersion := tx.db.GetVersion(key)
if oldVersion != newVersion {
// 获取key的版本,如果版本变化了,则返回错误
return reply.MakeErrReply("ERR VERSION CHANGED")
}
}

return reply.MakeOkReply()
}

Commit

在本地事务收到 commit 命令后,会执行命令,(如果开启原子事务)并记录 undolog。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Commit TCC事务的commit阶段
func (tx *Transaction) Commit() redis.Reply {
tx.mu.Lock() // Commit互斥进行操作,防止commit时调用cancel,发生冲突
defer tx.mu.Unlock()
defer func() {
// 设置阶段
tx.phase = phaseCommit
}()

// 依次执行命令
var results [][]byte // 存储命令执行的结果
for _, cmdLine := range tx.cmdLines {
cmdName := string(cmdLine[0])
key := string(cmdLine[1])
if !engine.IsReadOnlyCommand(cmdName) {
// 写命令,则记录需要add版本的key
tx.addVersionKeys = append(tx.addVersionKeys, key)
if config.Properties.OpenAtomicTx {
// 开启了原子性事务,则记录undo log
tx.undoLogs = append(tx.undoLogs, tx.db.GetUndoLog(key))
}
}

// 执行命令
r := tx.db.ExecWithLock(cmdLine)

if config.Properties.OpenAtomicTx && reply.IsErrorReply(r) {
// 开启了原子性事务,并且如果发生错误,直接返回错误
if !engine.IsReadOnlyCommand(cmdName) {
// 删除错误命令的undo log,不需要增加版本号
tx.addVersionKeys = tx.addVersionKeys[:len(tx.addVersionKeys)-1]
tx.undoLogs = tx.undoLogs[:len(tx.undoLogs)-1]
}

return r
}

results = append(results, []byte(r.DataString()))
}

return reply.MakeMultiBulkStringReply(results)
}

Cancel

在收到 cancel 命令后,(如果开启原子性事务)会执行 undo log 进行回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Cancel TCC事务的cancel阶段
func (tx *Transaction) Cancel() redis.Reply {
tx.mu.Lock() // Cancel互斥进行操作,防止commit时调用cancel,发生冲突
defer tx.mu.Unlock()
defer func() {
// 设置阶段
tx.phase = phaseCancel
}()

// 从后向前依次执行undo日志,只有开启原子性事务时,len(tx.undoLogs)才可能大于0
for i := len(tx.undoLogs) - 1; i >= 0; i-- {
undoLog := tx.undoLogs[i]
if len(undoLog) == 0 {
continue
}

for _, cmdLine := range undoLog {
tx.db.ExecWithLock(cmdLine)
}

}

// 清空
tx.undoLogs = nil

return reply.MakeOkReply()
}

本项目完整地址 simple-redis

Redis 不支持原子事务,在 multi 排队执行时,如果在入队时发生错误则放弃执行,如果在 exec 执行时错误则跳过这条命令。

Simple-Redis 支持原子事务,在 multi 排队执行时,如果在入队时发生错误则放弃执行,在 exec 执行时错误则整条命令回滚。

原子事务

watch

在数据库存储引擎层,使用了并发的 dict 记录 key 和版本号的映射关系,当写命令执行成功时版本号加一。

在 Redis 中 watch 命令用于记录一个 key 当前的版本号,将版本号记录在客户端连接的上下文中。

事务

原子事务的实现是在数据库引擎层实现的,在 multi 后客户端发出的每一条命令,数据库只会检查语法错误并将命令记录在客户端连接的上下文中。

在 exec 时,才会从客户端连接的上下文中读取所有需要执行的命令队列。exec 命令的实现方式如下:

  • 首先加写锁、加读锁(watch 的 key 也会加上读锁)。
  • 检查 watch 的 key 版本是否变化,如果变化则直接返回,什么都不做。
  • 依次执行每一条命令,如果开启了原子事务,则会记录 undo log。
  • 如果执行成功,对相应的 key 增加版本号;如果执行失败,则根据 undo log 进行回滚。
阅读全文 »

本项目完整地址 simple-redis

集群

Simple-Redis 的集群本质上就是一个分片集群,使用一致性哈希环实现。

结构体

Cluster 结构体如下,其中:

  • peers 为一致性哈希,用于根据 key 选择节点。
  • getters 用于和远程节点通信。

以下三个属性与分布式事务相关:

  • idGenerator:雪花 ID(分布式 ID)生成器,用于生成分布式事务 ID。
  • transactionMap:记录所有的分布式事务。
  • coordinatorMap:记录所有的事物协调者。
1
2
3
4
5
6
7
8
9
10
// Cluster 用于和集群中的主机进行交互
type Cluster struct {
self string // 本机地址,如 127.0.0.1:6107
peers cluster.PeerPicker // 一致性哈希,用于选择节点
getters map[string]cluster.PeerGetter // 用于和远程节点通信

idGenerator *snowflake.Node // snowflake id生成器,用于生成分布式事务的id
transactionMap *dict.SimpleDict // 记录所有的分布式事务(本地)
coordinatorMap *dict.SimpleDict // 记录事务协调者
}

getter

在和远程节点通信时,本节点就作为一个 Client。而一个 getter 维护一个远程节点的连接池(实际上一个 getter 维护多个连接池,一个数据库对应一个连接池。

集群中节点之间的通信采用连接池,是为了:

  • 复用连接(复用空闲连接)。
  • 增加并发性能(节点之间有多个连接)。
  • 控制节点之间的连接数,不会因为节点之间的通信而影响客户端与节点之间的通信(有最大空闲连接数和最大活跃连接数的限制)。
阅读全文 »

本项目完整地址 simple-redis

性能测试

测试环境

使用的机器为 MacBook Pro 13,处理器 Intel i5 四核,内存 8GB。

测试结果

在同样的测试环境上,使用 redis-benchmark分别对 Redis 和 Simple-Redis 进行性能测试。

发送一万条请求,对 set,get,incr,lpush,rpush,lpop,rpop,sadd,hset,spop,zadd,lrange 命令进行测试。

Redis

Redis 测试结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
zhaohan08@MacBook-Pro redis % redis-benchmark -t set,get,incr,lpush,rpush,lpop,rpop,sadd,hset,spop,zadd,lrange -n 10000 -q
SET: 93457.95 requests per second, p50=0.263 msec
GET: 96153.84 requests per second, p50=0.263 msec
INCR: 96153.84 requests per second, p50=0.263 msec
LPUSH: 100000.00 requests per second, p50=0.263 msec
RPUSH: 97087.38 requests per second, p50=0.263 msec
LPOP: 98039.22 requests per second, p50=0.263 msec
RPOP: 93457.95 requests per second, p50=0.263 msec
SADD: 93457.95 requests per second, p50=0.263 msec
HSET: 87719.30 requests per second, p50=0.271 msec
SPOP: 95238.10 requests per second, p50=0.263 msec
ZADD: 100000.00 requests per second, p50=0.263 msec
LPUSH (needed to benchmark LRANGE): 99009.90 requests per second, p50=0.263 msec
LRANGE_100 (first 100 elements): 38167.94 requests per second, p50=0.647 msec
LRANGE_300 (first 300 elements): 16366.61 requests per second, p50=1.503 msec
LRANGE_500 (first 500 elements): 10989.01 requests per second, p50=2.223 msec
LRANGE_600 (first 600 elements): 9363.30 requests per second, p50=2.639 msec

Simple-Redis

Simple-Redis 测试结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
zhaohan08@MacBook-Pro redis % redis-benchmark -t set,get,incr,lpush,rpush,lpop,rpop,sadd,hset,spop,zadd,lrange -n 10000 -q
WARNING: Could not fetch server CONFIG
SET: 68965.52 requests per second, p50=0.367 msec
GET: 62111.80 requests per second, p50=0.399 msec
INCR: 49751.24 requests per second, p50=0.487 msec
LPUSH: 56497.18 requests per second, p50=0.447 msec
RPUSH: 49504.95 requests per second, p50=0.503 msec
LPOP: 68965.52 requests per second, p50=0.375 msec
RPOP: 68493.15 requests per second, p50=0.375 msec
SADD: 52631.58 requests per second, p50=0.495 msec
HSET: 68027.21 requests per second, p50=0.375 msec
SPOP: 68027.21 requests per second, p50=0.383 msec
ZADD: 49751.24 requests per second, p50=0.503 msec
LPUSH (needed to benchmark LRANGE): 56818.18 requests per second, p50=0.463 msec
LRANGE_100 (first 100 elements): 25773.20 requests per second, p50=0.895 msec
LRANGE_300 (first 300 elements): 13586.96 requests per second, p50=1.775 msec
LRANGE_500 (first 500 elements): 9041.59 requests per second, p50=2.607 msec
LRANGE_600 (first 600 elements): 7704.16 requests per second, p50=3.015 msec

结论

在单机时,可以得出结论,Simple-Redis 和 Redis 是有性能差距的,具体数据如下:

  • string 类型:Simple-Redis 的性能大约是 Redis 的 70%。
  • list 类型:Simple-Redis 的性能大约是 Redis 的 50%。
  • hash 类型:Simple-Redis 大约是 Redis 的 75%。
  • set 类型:Simple-Redis 的性能大约是 Redis 的 55%。
  • zset 类型:Simple-Redis 的性能大约是 Redis 的 50%。

本项目完整地址 simple-redis

本节说明 simple-redis 中的 AOF 持久化功能。AOF(append only file)是一种 Redis 持久化方式。,其优缺点总结如下:

  • 优势:
    • 持久化文件是用户可以理解的。
    • 备份机制更稳健,丢失数据概率更低。
    • AOF日志文件的命令通过可读的方式进行记录,这个特性非常适合做灾难性的误删除的紧急恢复。比如某人不小心使用了flushall清空了所有数据库,只有重写操作还没有发生,那么就可以立即拷贝AOF文件,将最后一条flushall命令给删了,然后再将该AOF文件放回去,就可以通过恢复机制,自动恢复所有数据。
  • 劣势:
    • 比起RDB占用更多的磁盘空间。
    • 恢复备份速度慢
    • 每次读写都同步的话,有一定的性能压力。

AOF 持久化

数据结构

Persister 是 AOF 持久化中的核心数据结构,它从 channel 中接收消息并且将消息写入到 AOF 文件中。其中重要的字段如下:

  • db:指向 simple-redis 服务器。
  • tmpDBMaker:临时数据库创建函数,在进行 AOF 重写时,需要建立一个临时数据库加载 AOF 持久化文件,通过遍历临时数据库中的 key 实现 AOF 持久化文件的重写压缩。
  • aofChan:需要持久化的命令(payload 包含命令、数据库编号两个字段)发送到这个管道上进行持久化。
  • aofFilename:AOF 持久化文件名。
  • aofFsync:AOF 刷盘策略,共有三种策略分别是FsyncAlways、FsyncEverySec、FsyncNo。
  • currentDB:当前数据库编号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const (
FsyncAlways = iota // 每一个命令都会进行刷盘操作
FsyncEverySec // 每秒进行一次刷盘操作
FsyncNo // 不主动进行刷盘操作,交给操作系统去决定
)

type CmdLine [][]byte

const (
aofQueueSize = 1 << 16
)

type Persister struct {
ctx context.Context
cancel context.CancelFunc
db database.DBEngine
tmpDBMaker func() database.DBEngine
aofChan chan *payload
aofFile *os.File
aofFilename string
aofFsync int // AOF 刷盘策略
// aof goroutine will send msg to main goroutine through this channel when aof tasks finished and ready to shut down
aofFinished chan struct{}
// pause aof for start/finish aof rewrite progress
pausingAof sync.Mutex
// 表示正在aof重写,同时只有一个aof重写
aofRewriting sync.WaitGroup
currentDB int
}

payload 包含命令、数据库编号两个字段,它表示发送给 aofChan 的数据。

1
2
3
4
type payload struct {
cmdLine CmdLine
dbIndex int
}
阅读全文 »

本项目完整地址 simple-redis

底层数据库

在上一节中,我们简述了 simple-redis 的工作方式,需要注意的是如 GET、SET 这样需要在某个具体的数据库中执行的命令,单机模式下 Server 会调用 Server.db.Exec 去执行这类命令。

本节我们就聊一聊 simple-redis 的底层数据库,simple-redis 的底层数据库定义在 database/engine 文件夹中。

阅读全文 »

本项目完整地址 simple-redis

Simple-Redis 服务器

在之前已经介绍了 TCP 服务器,本节介绍 Simple-Redis 服务器,这是一个应用层服务器。在 Handler 的 Handle 方法中,有这样一条命令 result := h.db.Exec(client, r.Args),它将收到的命令交给 simple-redis 服务器去执行。

数据结构

simple-redis 服务器的被定义在 database/server.go 文件中,simple-redis 服务器的相关代码在 database 文件夹下。

simple-redis 服务器的数据结构如下,需要说明的是:

  • dbSet:代表底层的数据库。
  • AofPersister:AOF 持久化。
  • AofFileSize:用于记录上一次 AOF 重写后的文件大小。
  • rewriteWait、rewriting:用于同步 AOF 重写过程。
  • closed:接收关闭信号,用于优雅的关闭(用于关闭自动 AOF 重写协程)。
  • cluster:集群相关。
  • publish:订阅发布相关操作。
1
2
3
4
5
6
7
8
9
10
type Server struct {
dbSet []*atomic.Value // *DB
AofPersister *aof.Persister // AOF 持久化
AofFileSize int64
rewriteWait sync.WaitGroup
rewriting atomic.Bool
closed chan struct{}
cluster *cluster.Cluster
publish publish.Publish
}

AOF 持久化、集群、发布订阅会在后面的章节中说明。

阅读全文 »

本项目完整地址 simple-redis

List

Godis 中定义了 List 接口,以定义 List 的各种操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Expected check whether given item is equals to expected value
type Expected func(a interface{}) bool

// Consumer traverses list.
// It receives index and value as params, returns true to continue traversal, while returns false to break
type Consumer func(i int, v interface{}) bool

type List interface {
Add(val interface{})
Get(index int) (val interface{})
Set(index int, val interface{})
Insert(index int, val interface{})
Remove(index int) (val interface{})
RemoveLast() (val interface{})
RemoveAllByVal(expected Expected) int
RemoveByVal(expected Expected, count int) int
ReverseRemoveByVal(expected Expected, count int) int
Len() int
ForEach(consumer Consumer)
Contains(expected Expected) bool
Range(start int, stop int) []interface{}
}

数据结构

快速链表

在 godis 中,采用快速链表作为 List 的数据结构。

快速链表实际上就是一个双向链表,但是双向链表中的每一个节点不是存储一个数据,而是将数据连续存放形成一段连续的存储空间作为链表的节点。

这一段连续的存储空间在 godis 中被称为 page(每一个 page 的大小为 1024),page 的类型为空接口切片

1
2
3
4
5
6
7
8
9
// pageSize must be even
const pageSize = 1024

// QuickList is a linked list of page (which type is []interface{})
// QuickList has better performance than LinkedList of Add, Range and memory usage
type QuickList struct {
data *list.List // list of []interface{}
size int
}
阅读全文 »