Dawn's Blogs

分享技术 记录成长

0%

Simple-Redis实现 (11) 原子事务

本项目完整地址 simple-redis

Redis 不支持原子事务,在 multi 排队执行时,如果在入队时发生错误则放弃执行,如果在 exec 执行时错误则跳过这条命令。

Simple-Redis 支持原子事务,在 multi 排队执行时,如果在入队时发生错误则放弃执行,在 exec 执行时错误则整条命令回滚。

原子事务

watch

在数据库存储引擎层,使用了并发的 dict 记录 key 和版本号的映射关系,当写命令执行成功时版本号加一。

在 Redis 中 watch 命令用于记录一个 key 当前的版本号,将版本号记录在客户端连接的上下文中。

事务

原子事务的实现是在数据库引擎层实现的,在 multi 后客户端发出的每一条命令,数据库只会检查语法错误并将命令记录在客户端连接的上下文中。

在 exec 时,才会从客户端连接的上下文中读取所有需要执行的命令队列。exec 命令的实现方式如下:

  • 首先加写锁、加读锁(watch 的 key 也会加上读锁)。
  • 检查 watch 的 key 版本是否变化,如果变化则直接返回,什么都不做。
  • 依次执行每一条命令,如果开启了原子事务,则会记录 undo log。
  • 如果执行成功,对相应的 key 增加版本号;如果执行失败,则根据 undo log 进行回滚。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// ExecMulti multi命令执行阶段
func (db *DB) ExecMulti(c redis.Connection) redis.Reply {
return db.ExecMultiCommand(c.GetEnqueuedCmdLine(), c.GetWatching())
}

func (db *DB) ExecMultiCommand(cmdLines [][][]byte, watching map[string]uint32) redis.Reply {
// 此时不需要检查是否有语法错误,因为在排队过程中已经检查过了

// // 获取所有需要加锁的key
writeKeys := make([]string, len(cmdLines))
readKeys := make([]string, len(cmdLines)+len(watching))
for _, cmdLine := range cmdLines {
cmdName := strings.ToLower(string(cmdLine[0]))
// 获取命令
cmd, _ := cmdTable[cmdName]

// 获取需要加锁的key
prepare := cmd.prepare
write, read := prepare(cmdLine[1:])
writeKeys = append(writeKeys, write...)
readKeys = append(readKeys, read...)
}

// 获取需要watch的key
watchingKeys := make([]string, 0, len(watching))
for key := range watching {
watchingKeys = append(watchingKeys, key)
}
readKeys = append(readKeys, watchingKeys...)

// 执行前的加锁
db.RWLocks(writeKeys, readKeys)
defer db.RWUnLocks(writeKeys, readKeys)

// 执行前检查version是否变化
versionChanged := db.checkVersionChanged(watching)
if versionChanged {
// version变化了,什么都不执行
return reply.MakeNullBulkStringReply()
}

// 执行
var results [][]byte // 存储命令执行的结果
var undoLogs [][]CmdLine // undo日志
aborted := false
for _, cmdLine := range cmdLines {
cmdName := strings.ToLower(string(cmdLine[0]))
// 获取命令
cmd, _ := cmdTable[cmdName]

if config.Properties.OpenAtomicTx {
// 开启原子性事务,记录undo日志
key := string(cmdLine[1])
undoLogs = append(undoLogs, db.GetUndoLog(key))
}

// 执行命令
fun := cmd.executor
r, aofExpireCtx := fun(db, cmdLine[1:])

if config.Properties.OpenAtomicTx && reply.IsErrorReply(r) {
// 如果开启原子性事务,并且其中一条命令执行失败了,全部全部回滚
undoLogs = undoLogs[:len(undoLogs)-1] // 不执行最后一条失败的undo log
aborted = true
break
}

results = append(results, []byte(r.DataString()))
db.afterExec(r, aofExpireCtx, cmdLine)
}

if len(results) == 0 {
return reply.MakeEmptyMultiBulkStringReply()
}

if config.Properties.OpenAtomicTx && aborted { // 开启原子性事务并且执行失败了,则进行回滚
size := len(undoLogs)
for i := size - 1; i >= 0; i-- {
undoLog := undoLogs[i]
if len(undoLog) == 0 {
continue
}
for _, cmdLine := range undoLog {
db.ExecWithLock(cmdLine)
}
}
return reply.MakeErrReply("EXECABORT Transaction rollback because of errors during executing. (atomic tx is open)")
}

// 未开启原子性事务,或者执行成功
// 写命令增加版本
db.AddVersion(writeKeys...)

return reply.MakeMultiBulkStringReply(results)
}

生成 undo log

获取 undo log 的逻辑非常简单:

  • 判断原来是否存在:
    • 不存在,undo log 为删除这个 key。
    • 存在,则 undo log 为:
      • 删除新的值,再恢复原来的值。
      • 如果原来有 TTL,设置过期时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (db *DB) GetUndoLog(key string) []CmdLine {
undoLog := make([]CmdLine, 0, 3)
entity, exist := db.GetEntity(key)
if !exist {
// 不存在,直接删除key
undoLog = append(undoLog, utils.StringsToCmdLine("DEL", key))
} else {
// 存在,首先删除新的值
undoLog = append(undoLog, utils.StringsToCmdLine("DEL", key))
// 接着恢复为原来的值
undoLog = append(undoLog, utils.EntityToCmdLine(key, entity))
// 设置 TTL
if raw, ok := db.ttlMap.Get(key); ok { // 获取过期时间
// 如果有过期时间
expireTime, _ := raw.(time.Time)
// 设置过期时间
undoLog = append(undoLog, utils.ExpireToCmdLine(key, expireTime))
}
}

return undoLog
}