Dawn's Blogs

分享技术 记录成长

0%

Simple-Redis实现 (12) 分布式事务

本项目完整地址 simple-redis

Simple-Redis 是支持分布式事务的,采用 TCC(Try-Commit-Cancel)实现分布式事务,以当前节点为事务协调者,控制分布在其他节点的本地事务。所以 Simple-Redis 的分布式事务实现包括两个部分:

  • 事务协调者:一个分布式事务的控制器。
  • 本地事务:用于执行命令,并进行 commit 或者 cancel。

事务协调者

定义

TCC 事务协调者定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Coordinator TCC事务协调者
type Coordinator struct {
id string // 事务id
dbIndex int // 数据库id
self string // 自己的地址
peers cluster.PeerPicker // 一致性哈希,用于选择节点
getters map[string]cluster.PeerGetter // 用于和远程节点通信

watching map[string]map[string]uint32 // key为peer地址,记录被watch的key的版本号

cmdLinesNum int
indexMap map[string][]int // key为peer,value为cmdline在原来multi队列中的下标,用于重组reply
}

流程

一次分布式事务的流程如下:

  • 对命令和 watch 的 key,按照节点的不同进行分组
  • 对每一组节点发送 try 命令,try 命令比较复杂又分为几个子命令:
    • try start:通知节点有新建一个本地分布式事务。
    • try watched:发送 wath 时的 version,比较 version 是否发生变化。
    • try xxx:依次发送在该节点上执行的命令。
    • try end:通知 try 阶段已经结束,本地事务检查版本是否变化、锁定相关的 key。
  • 对每一组节点发送 commit 或者 cancel 命令
  • 重组事务的返回结果,按照正常的顺序进行返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// ExecTx 执行TCC分布式事务
func (coordinator *Coordinator) ExecTx(cmdLines [][][]byte, watching map[string]uint32) redis.Reply {
// 首先对命令进行分组,以同一个节点上执行的所有命令为一组
groupByMap := coordinator.groupByCmdLines(cmdLines)

// 对watch的key进行分组
coordinator.groupByWatch(watching)

// 对每一组中发送try命令
needCancel := false // 记录是否需要回滚
for peer, cmd := range groupByMap {
r := coordinator.sendTry(peer, cmd)
if reply.IsErrorReply(r) {
needCancel = true
break
}
}

defer func() {
// 返回之前发送end消息,通知各个分布式节点结束事务
for peer := range groupByMap {
coordinator.sendEnd(peer)
}
}()

// 向各个节点提交/回滚分布式事务
replies := make(map[string]redis.Reply)
for peer := range groupByMap {
if needCancel {
// 取消事务
coordinator.sendCancel(peer)
} else {
// 提交事务
r := coordinator.sendCommit(peer)
if config.Properties.OpenAtomicTx && reply.IsErrorReply(r) {
// 如果开启了原子事务并且提交发生了错误,则进行所有的节点进行回滚
needCancel = true
break
}
replies[peer] = r
}
}

// 如果开启了原子事务,并且发生了错误,所有节点进行回滚
if needCancel && config.Properties.OpenAtomicTx {
for peer := range groupByMap {
// 取消事务,进行回滚
coordinator.sendCancel(peer)
}

return reply.MakeErrReply("EXECABORT Transaction rollback because of errors during executing. (atomic tx is open)")
}

if needCancel {
// 没有开启原子事务,但是发送了错误(如watch的key变化了),则返回(nil)
return reply.MakeNullBulkStringReply()
}

// 正常提交,重组reply
return coordinator.recombineReplies(replies)
}

try

try 命令比较复杂,分为几个子命令:

  • try start:通知节点有新建一个本地分布式事务。
  • try watched:发送 wath 时的 version,比较 version 是否发生变化。
  • try xxx:依次发送在该节点上执行的命令。
  • try end:通知 try 阶段已经结束,本地事务检查版本是否变化、锁定相关的 key。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (coordinator *Coordinator) sendTry(peer string, cmdLines [][][]byte) redis.Reply {
// 获取getter
getter := coordinator.getters[peer]

var r redis.Reply
// 发送try开始命令
r = getter.RemoteExec(coordinator.dbIndex, utils.StringsToCmdLine("try", coordinator.id, "start"))
if reply.IsErrorReply(r) {
// 如果发生错误,则中断try,直接返回
return r
}

// 发送watch时的version,比较version是否发生变化
watching := coordinator.watching[peer]
for key, version := range watching {
versionStr := strconv.FormatInt(int64(version), 10)
r = getter.RemoteExec(coordinator.dbIndex, utils.StringsToCmdLine("try", coordinator.id, "watched", key, versionStr))
if reply.IsErrorReply(r) {
// 如果发生变化,则中断try,直接返回
return r
}
}

// 依次发送需要执行的命令,每一条命令=try tx_id cmdline,如 try 123456 set k1 v1
for _, cmdLine := range cmdLines {
tryCmd := make([][]byte, 0, 2+len(cmdLine))
tryCmd = append(tryCmd, utils.StringsToCmdLine("try", coordinator.id)...)
tryCmd = append(tryCmd, cmdLine...)
r = getter.RemoteExec(coordinator.dbIndex, tryCmd)
if reply.IsErrorReply(r) {
// 如果发生错误,则中断try,直接返回
return r
}
}

// 发送try结束命令
r = getter.RemoteExec(coordinator.dbIndex, utils.StringsToCmdLine("try", coordinator.id, "end"))
if reply.IsErrorReply(r) {
// 如果发生错误,则中断try,直接返回
return r
}

return reply.MakeOkReply()
}

本地事务

本地事务就是分布式事务在节点上的实际执行者,下面是本地事务的定义:

  • watchedKeys 对应 try watched 命令,记录 watch 时的版本,用于在执行时进行版本比较。
  • cmdLines 对应 try xxx 命令,记录当前节点的本地事务中,包含的所有需要执行的命令。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Transaction TCC本地事务
type Transaction struct {
id string // 事务id
phase int // 阶段,分别是try、commit、cancel三个阶段

writeKeys []string
readKeys []string
watchedKeys map[string]uint32 // 记录watch时的版本

cmdLines [][][]byte // 本次事务中包含的命令
undoLogs [][][][]byte // 本次事务的undo log
addVersionKeys []string // 需要增加版本的key

db *engine.DB

mu sync.Mutex
}

Try

在收到 try end 命令后,表明 try 阶段已经结束,此时本地事务:

  • 锁定需要读写的 key。
  • 在在时间轮中添加任务, 自动回滚超时未提交的事务。
  • 读取版本是否变化,如果有变化则返回错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Try TCC事务的结束try阶段,锁定key
func (tx *Transaction) Try() redis.Reply {
if tx.phase != phaseTry {
return reply.MakeErrReply("ERR TRY ERROR")
}

// 锁定需要读写的key
tx.db.RWLocks(tx.writeKeys, tx.readKeys)

// 在时间轮中添加任务, 自动回滚超时未提交的事务
taskKey := tx.id
timewheel.Delay(maxLockTime, taskKey, func() {
if tx.phase == phaseTry {
logger.Info("abort transaction: " + tx.id)
_ = tx.Cancel()
}
})

// 读取版本是否变化
for key, oldVersion := range tx.watchedKeys {
newVersion := tx.db.GetVersion(key)
if oldVersion != newVersion {
// 获取key的版本,如果版本变化了,则返回错误
return reply.MakeErrReply("ERR VERSION CHANGED")
}
}

return reply.MakeOkReply()
}

Commit

在本地事务收到 commit 命令后,会执行命令,(如果开启原子事务)并记录 undolog。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Commit TCC事务的commit阶段
func (tx *Transaction) Commit() redis.Reply {
tx.mu.Lock() // Commit互斥进行操作,防止commit时调用cancel,发生冲突
defer tx.mu.Unlock()
defer func() {
// 设置阶段
tx.phase = phaseCommit
}()

// 依次执行命令
var results [][]byte // 存储命令执行的结果
for _, cmdLine := range tx.cmdLines {
cmdName := string(cmdLine[0])
key := string(cmdLine[1])
if !engine.IsReadOnlyCommand(cmdName) {
// 写命令,则记录需要add版本的key
tx.addVersionKeys = append(tx.addVersionKeys, key)
if config.Properties.OpenAtomicTx {
// 开启了原子性事务,则记录undo log
tx.undoLogs = append(tx.undoLogs, tx.db.GetUndoLog(key))
}
}

// 执行命令
r := tx.db.ExecWithLock(cmdLine)

if config.Properties.OpenAtomicTx && reply.IsErrorReply(r) {
// 开启了原子性事务,并且如果发生错误,直接返回错误
if !engine.IsReadOnlyCommand(cmdName) {
// 删除错误命令的undo log,不需要增加版本号
tx.addVersionKeys = tx.addVersionKeys[:len(tx.addVersionKeys)-1]
tx.undoLogs = tx.undoLogs[:len(tx.undoLogs)-1]
}

return r
}

results = append(results, []byte(r.DataString()))
}

return reply.MakeMultiBulkStringReply(results)
}

Cancel

在收到 cancel 命令后,(如果开启原子性事务)会执行 undo log 进行回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Cancel TCC事务的cancel阶段
func (tx *Transaction) Cancel() redis.Reply {
tx.mu.Lock() // Cancel互斥进行操作,防止commit时调用cancel,发生冲突
defer tx.mu.Unlock()
defer func() {
// 设置阶段
tx.phase = phaseCancel
}()

// 从后向前依次执行undo日志,只有开启原子性事务时,len(tx.undoLogs)才可能大于0
for i := len(tx.undoLogs) - 1; i >= 0; i-- {
undoLog := tx.undoLogs[i]
if len(undoLog) == 0 {
continue
}

for _, cmdLine := range undoLog {
tx.db.ExecWithLock(cmdLine)
}

}

// 清空
tx.undoLogs = nil

return reply.MakeOkReply()
}