Dawn's Blogs

分享技术 记录成长

0%

以下是 CA 安全性的概述,分为两个部分:

  • 控制面:实现了一个 CA 负责为网格中的各个服务签发证书,并将证书颁发给各个服务,具体的做法有两个:

    • 通过(webhook)动态准入控制器的方式,将 CA 的证书挂载到 Pod 里面去,同时添加了环境变量,只需要读取环境变量就可以获取路径了。
    • Dubbo 应用通过与 CA 建立双向 TLS 连接(CA 证书已经被挂载到了 Pod 中,所以可以验证 CA 证书),发送 CSR 请求申请这个 dubbo 应用的证书,dubbo-cp 会验证客户端的 CSR 请求并签发证书返回给客户端。
  • 数据面:在网格中服务相互之前发起通信的时候,dubbo sdk 会拦截服务请求,采用证书进行双向 TLS 认证并建立一个 TLS 连接,使用该 TLS 连接来在网格中传输数据。

1703656322

在 dubbo-cp 中,要实现一个 CA 要做两方面的事情:

  1. 维护 CA:维护 CA 的证书状态,存储 CA 证书。
  2. 签发证书:接受 CSR 请求,为 Dubbo 应用签发证书。
阅读全文 »

类似于 istio 中的 xDS,在 dubbo-kubernetes 中 dds 组件做了一些对于 dubbo 规则的监听与分发。它的作用是接受各种 dubbo 规则(留了管控、认证、授权),将规则下发至各个 dubbo 应用,完成具有 dubbo 特色的服务治理。即作为控制面,将规则下发到数据面进行应用。

image-20231225215623017

对于 dds 而言,当一个客户端连接到 dubbo-cp dds,dubbo 规则的推送主要来自两个方面:

  1. Informer 机制监听到 CRD 资源发生变化,主动推送消息给客户端。
  2. 客户端发生 Request 请求,dds 服务器进行响应

dds 接口定义如下,因为推送的机制,所以定义为双向 streaming:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
message ObserveRequest {
string nonce = 1;
string type = 2;
}

message ObserveResponse {
string nonce = 1;
string type = 2;
int64 revision = 3;
repeated google.protobuf.Any data = 4;
}

service RuleService {
rpc Observe(stream ObserveRequest)
returns (stream ObserveResponse) {
}
}
阅读全文 »

本项目地址:LMQ

Lookup

LMQ Lookup 的功能就是存储 LMQ 分布式拓扑结构,为客户端提供查询服务。

拓扑结构

Lookup 维护了 lmqd,topic 和 channel 之间的拓扑结构,三者的对应关系如下。其中,lmqd 和 topic 为多对多的关系,topic 和 channel 为一对多的关系。

1
lmqd(producer) <--- M:N ---> topic <--- 1:N ---> channel

存储结构

根据 lookup 维护的拓扑结构,look 主要维护了这样一个注册字典:

  • 字典的键值分为两种类型,channel 和 topic。
  • 字典的值是对应的所有的生产者(Lmqd)。
1
2
3
4
type RegistrationDB struct {
sync.RWMutex
registrationMap map[iface.IRegistration]iface.ProducerMap
}

所以,存储结构如下:

1
2
3
4
5
6
7
{topic, topicName1, ""} ---> [{id1: producer1}, {id2: producer2}, ...]
{topic, topicName2, ""} ---> [{id3: producer3}, {id4: producer4}, ...]
...

{channel, topicName1, channelName1} ---> [{id1: producer1}, {id2: producer2}, ...]
{channel, topicName2, channelName2} ---> [{id3: producer3}, {id4: producer4}, ...]
...
阅读全文 »

本项目地址:LMQ

消息格式

LMQ 中的消息格式包括一个全局唯一的 ID(用分布式雪花 ID 生成的),数据,生产消息的时间戳,重试次数。

如时间戳、重试次数、消息 ID 等消息元数据,随着消息传输给客户端,这样做简化了客户端,客户端不需要维护消息的状态。

消息会被序列化为字节流(依次以大端存储的方式保存消息 ID、时间戳、重试次数、数据),保存在了响应的 message 字段中。

客户端与服务器之间的请求和响应统一采用 Json 进行编码。

1
2
3
4
5
6
7
8
9
10
11
type Message struct {
ID iface.MessageID `json:"ID"`
Data []byte `json:"Data"`
Timestamp int64 `json:"Timestamp"`
Attempts uint16 `json:"Attempts"`

// 优先队列中使用到的数据结构
clientID uint64
pri int64
index int
}

通信协议

LMQ 中客户端与 lmqd 的通信协议如下:

  • 首先客户端进行连接。
  • 发送 SUB 命令订阅某个 Channel。
  • 发送 RDY 命令更新可以接收消息的数量。
  • LMQ 服务器会持续推送规定数量之内的消息。
  • 最后客户端返回 FIN 和 REQ 表示消息的接收状态。

nsq protocol

客户端流量控制

通过推送数据到客户端最大限度地提高性能和吞吐量,而不是等待客户端拉数据。LMQ 的客户端通过 RDY 状态表示自己准备好接受消息的数量,这是一种客户端流量控制的方法。

当客户端连接到 lmqd 和并订阅到一个 channel 时,客户端的 RDY 初始化为 0。当客户端的 RDY 状态被更新为大于零时,channel 才开始给这个客户端发送消息。

消息确认

客户端在收到消息后,需要发送 FIN 消息,表示消息已经成功接受。客户端也可以返回 REQ 消息,表示消息接受失败,此条消息重新进行排队。

当 lmqd 在超时时间内没有接受到 FIN 或者 REQ 消息,会自动判定客户端超时,这条消息会重新排队发送。

通过消息确认机制,可以保证消息确认被客户端收到,(在 lmqd 不崩溃的情况下)消息不会丢失。

本项目地址:LMQ

磁盘队列

每一个 topic 都会维护一个磁盘消息队列,每一个 channel 也会维护一个磁盘消息队列。当内存消息队列满了之后,新的消息就会存储在磁盘消息队列中。

在 lmqd 退出时消息的持久化也是依靠磁盘消息队列实现(将内存消息队列中的消息全部推入磁盘消息队列中,进行持久化存储)。

元数据

一个磁盘消息队列会维护多个文件,以 index 标识顺序。一个磁盘队列的核心元数据就是记录四个变量(元数据也会被持久化,以便标识下一次启动时的读取和写入位置):

  • 当前读取的文件号,当前读取的文件位置。
  • 当前写入的文件号,当前写入的文件位置。
阅读全文 »

本项目地址:LMQ

lmqd 中的元数据是指 lmqd 中的 Topic 和 Channel 信息

Topic

Topic 负责接受生产者的消息,并且将消息复制分发给每一个 channel 中。

数据结构

Topic 的结构如下,主要的属性有:

  • name:topic 的名称。
  • isTemporary:是否是临时的 Topic,如果名称带有 #tmp 则表明这是一个临时的 Topic。
    • 对于临时 Topic,backendQueue(磁盘队列)为 nil,所以超出内存队列长度的数据会被直接丢弃。
    • 若 channel 为空,则会立即删除这个 topic。
    • 一个临时的 topic 不会持久化到磁盘中。
  • channels:topic 下有多个 channel,保存所有在此 topic 下的 channel。
  • memoryMsgChan:内存消息队列。
  • backendMsgChan:磁盘消息队列。
  • deleteCallback:这个 topic 删除后的回调函数,主要用于删除 lmqd 中维护的 topic 信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type Topic struct {
lmqd iface.ILmqDaemon

name string
isTemporary bool // 标记是否是临时的topic
isPausing atomic.Bool // 标记是否已经暂停
isExiting atomic.Bool // 标记是否已经退出
channels map[string]iface.IChannel // 保存所有的channel字典
channelsLock sync.RWMutex // 控制对channel字典的互斥访问

guidFactory *GUIDFactory // message id 生成器

memoryMsgChan chan iface.IMessage // 内存chan
backendQueue backendqueue.BackendQueue // 当内存chan满了之后,将消息存入到后端队列中(持久化保存)

deleteCallback func(topic iface.ITopic)
deleter sync.Once

startChan chan struct{}
updateChan chan struct{}
pauseChan chan struct{}
closingChan chan struct{}
closedChan chan struct{}

messageCount atomic.Uint64
messageBytes atomic.Uint64
}

生产消息

通过 select,在内存队列中放入消息。如果内存队列已满,则会进入 default 分支,将消息送入磁盘消息队列中。

阅读全文 »

本项目地址:LMQ

LMQ 与 NSQ 的整体设计类似。

介绍

特性

LMQ 是分布式的消息队列,特性为:

  • 支持无单点故障的分布式拓扑。
  • 无缝的支持水平扩展,无缝的添加更多节点到集群。
  • 主要是在内存中传递消息,直接使用 TCP 协议而不是 HTTP 协议,所以性能可以保障。
  • 客户端发现的方式,通过查询 LmqLookup 找到生产者。

消息保证

消息不持久化机制

LMQ 支持内存队列磁盘队列,超过内存队列长度的消息会被存放在磁盘队列中。在内存队列中的消息可能因为宕机而丢失,存储在磁盘中的消息不会丢失。

可以通过设置内存队列长度为 0,所有的消息将会存储到磁盘,这样消息不会丢失

在正常推出的情况下,内存中的消息会被推送至磁盘队列中,在磁盘中保存持久化,元数据信息以 json 的格式进行持久化。

消息最少被投递一次

由于客户端超时导致的消息重新排队等原因,消息会被重复投递,消息通过 ID 区分,由客户端负责操作。

消息是无序的

LMQ 不保证消息的投递顺序和生产顺序是一致的。因为 LMQ 由内存队列、磁盘队列、重排队列混合组成,所以消息是无序的。

阅读全文 »

本项目地址:LMQ

LMQ 是一个仿照 NSQ 的基于 TCP 协议的分布式消息队列,本系列说明 LMQ 的实现方式。

在本节说明 TCP 服务器的实现,TCP 服务器的实现地址:Hamble TCP Server

TCP 服务器架构

TCP 服务器的架构如下,总体上采用 1+M+N 的服务器编程模型

  • 一个协程(Server goroutine)用于建立连接(Accept)。
  • M 个协程用于读写取连接请求信息。
  • N 个协程用于处理请求内容,并生成响应信息。

image-20231218132654361

阅读全文 »

本项目完整地址 simple-redis

总结

功能实现

Simple-Redis 是一个 golang 编写的实现了 RESP(REdis Serialization Protocol)协议的简易 Redis,实现的功能包括:

  • 五大基本数据类型(string、set、hash、list、zset)及其操作。
  • 通信协议为 RESP 协议,与 Redis-cli 兼容。
  • 增加并发性,可以并发执行命令。
  • AOF 持久化功能,包括 AOF 持久化和 AOF 重写。
  • 分片集群功能,以一致性哈希实现。
  • 发布订阅功能,更支持集群下的发布订阅模式。
  • 支持原子事务,在执行错误时进行回滚。
  • 支持分布式事务,支持在多集群下的 TCC 分布式事务。

优势

相比于 Redis,Simple-Redis 的优势在于:

  • 与 Redis 在执行命令时单线程不同,Simple-Redis 为每一个客户端并发的执行命令
  • pipeline 模式客户端,无需等待响应即可发送下一条命令。
  • TTL 过期使用时间轮算法实现,时间轮算法保证了不会扫描全部的 key(节省 CPU),又能够及时的删除过期数据(节省 内存)
  • 集群采用一致性哈希实现,使 key 更加均匀的分布。
  • 集群间的通信采用连接池的方式,优势在于:
    • 复用连接(复用空闲连接)。
    • 增加并发性能(节点之间有多个连接)。
    • 控制节点之间的连接数,不会因为节点之间的通信而影响客户端与节点之间的通信(有最大空闲连接数和最大活跃连接数的限制)。
  • 支持集群下的发布订阅模式,为了支持这一功能,定义了 publish 和 publishsingle 命令。
    • publish 命令:客户端向集群中的一个节点发送 publish 命令,节点会向所有的 peers 转发这条发布消息
    • publishsingle 命令:节点向所有 peers 转发发布的消息时,使用的是 publishsingle 命令,表明节点在收到消息后不用继续转发了(避免了节点转发再转发,造成的无限消息洪泛)。
  • 支持原子事务,在执行错误时进行回滚。
  • 支持分布式事务,支持在多集群下的 TCC 分布式事务。以当前节点为事务协调者,控制分布在其他节点的本地事务。一次分布式事务的流程如下:
    • 对命令和 watch 的 key,按照节点的不同进行分组
    • 对每一组节点发送 try 命令,try 命令比较复杂又分为几个子命令:
      • try start:通知节点有新建一个本地分布式事务。
      • try watched:发送 wath 时的 version,比较 version 是否发生变化。
      • try xxx:依次发送在该节点上执行的命令。
      • try end:通知 try 阶段已经结束,本地事务检查版本是否变化、锁定相关的 key。
    • 对每一组节点发送 commit 或者 cancel 命令
    • 重组事务的返回结果,按照正常的顺序进行返回。

性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
zhaohan08@MacBook-Pro redis % redis-benchmark -t set,get,incr,lpush,rpush,lpop,rpop,sadd,hset,spop,zadd,lrange -n 10000 -q
WARNING: Could not fetch server CONFIG
SET: 68965.52 requests per second, p50=0.367 msec
GET: 62111.80 requests per second, p50=0.399 msec
INCR: 49751.24 requests per second, p50=0.487 msec
LPUSH: 56497.18 requests per second, p50=0.447 msec
RPUSH: 49504.95 requests per second, p50=0.503 msec
LPOP: 68965.52 requests per second, p50=0.375 msec
RPOP: 68493.15 requests per second, p50=0.375 msec
SADD: 52631.58 requests per second, p50=0.495 msec
HSET: 68027.21 requests per second, p50=0.375 msec
SPOP: 68027.21 requests per second, p50=0.383 msec
ZADD: 49751.24 requests per second, p50=0.503 msec
LPUSH (needed to benchmark LRANGE): 56818.18 requests per second, p50=0.463 msec
LRANGE_100 (first 100 elements): 25773.20 requests per second, p50=0.895 msec
LRANGE_300 (first 300 elements): 13586.96 requests per second, p50=1.775 msec
LRANGE_500 (first 500 elements): 9041.59 requests per second, p50=2.607 msec
LRANGE_600 (first 600 elements): 7704.16 requests per second, p50=3.015 msec

在单机时,可以得出结论,Simple-Redis 和 Redis 是有性能差距的(达到 Redis 的 60-70%),具体数据如下:

  • string 类型:Simple-Redis 的性能大约是 Redis 的 70%。
  • list 类型:Simple-Redis 的性能大约是 Redis 的 50%。
  • hash 类型:Simple-Redis 大约是 Redis 的 75%。
  • set 类型:Simple-Redis 的性能大约是 Redis 的 55%。
  • zset 类型:Simple-Redis 的性能大约是 Redis 的 50%。

本项目完整地址 simple-redis

Simple-Redis 是支持分布式事务的,采用 TCC(Try-Commit-Cancel)实现分布式事务,以当前节点为事务协调者,控制分布在其他节点的本地事务。所以 Simple-Redis 的分布式事务实现包括两个部分:

  • 事务协调者:一个分布式事务的控制器。
  • 本地事务:用于执行命令,并进行 commit 或者 cancel。

事务协调者

定义

TCC 事务协调者定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Coordinator TCC事务协调者
type Coordinator struct {
id string // 事务id
dbIndex int // 数据库id
self string // 自己的地址
peers cluster.PeerPicker // 一致性哈希,用于选择节点
getters map[string]cluster.PeerGetter // 用于和远程节点通信

watching map[string]map[string]uint32 // key为peer地址,记录被watch的key的版本号

cmdLinesNum int
indexMap map[string][]int // key为peer,value为cmdline在原来multi队列中的下标,用于重组reply
}

流程

一次分布式事务的流程如下:

  • 对命令和 watch 的 key,按照节点的不同进行分组
  • 对每一组节点发送 try 命令,try 命令比较复杂又分为几个子命令:
    • try start:通知节点有新建一个本地分布式事务。
    • try watched:发送 wath 时的 version,比较 version 是否发生变化。
    • try xxx:依次发送在该节点上执行的命令。
    • try end:通知 try 阶段已经结束,本地事务检查版本是否变化、锁定相关的 key。
  • 对每一组节点发送 commit 或者 cancel 命令
  • 重组事务的返回结果,按照正常的顺序进行返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// ExecTx 执行TCC分布式事务
func (coordinator *Coordinator) ExecTx(cmdLines [][][]byte, watching map[string]uint32) redis.Reply {
// 首先对命令进行分组,以同一个节点上执行的所有命令为一组
groupByMap := coordinator.groupByCmdLines(cmdLines)

// 对watch的key进行分组
coordinator.groupByWatch(watching)

// 对每一组中发送try命令
needCancel := false // 记录是否需要回滚
for peer, cmd := range groupByMap {
r := coordinator.sendTry(peer, cmd)
if reply.IsErrorReply(r) {
needCancel = true
break
}
}

defer func() {
// 返回之前发送end消息,通知各个分布式节点结束事务
for peer := range groupByMap {
coordinator.sendEnd(peer)
}
}()

// 向各个节点提交/回滚分布式事务
replies := make(map[string]redis.Reply)
for peer := range groupByMap {
if needCancel {
// 取消事务
coordinator.sendCancel(peer)
} else {
// 提交事务
r := coordinator.sendCommit(peer)
if config.Properties.OpenAtomicTx && reply.IsErrorReply(r) {
// 如果开启了原子事务并且提交发生了错误,则进行所有的节点进行回滚
needCancel = true
break
}
replies[peer] = r
}
}

// 如果开启了原子事务,并且发生了错误,所有节点进行回滚
if needCancel && config.Properties.OpenAtomicTx {
for peer := range groupByMap {
// 取消事务,进行回滚
coordinator.sendCancel(peer)
}

return reply.MakeErrReply("EXECABORT Transaction rollback because of errors during executing. (atomic tx is open)")
}

if needCancel {
// 没有开启原子事务,但是发送了错误(如watch的key变化了),则返回(nil)
return reply.MakeNullBulkStringReply()
}

// 正常提交,重组reply
return coordinator.recombineReplies(replies)
}

try

try 命令比较复杂,分为几个子命令:

  • try start:通知节点有新建一个本地分布式事务。
  • try watched:发送 wath 时的 version,比较 version 是否发生变化。
  • try xxx:依次发送在该节点上执行的命令。
  • try end:通知 try 阶段已经结束,本地事务检查版本是否变化、锁定相关的 key。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (coordinator *Coordinator) sendTry(peer string, cmdLines [][][]byte) redis.Reply {
// 获取getter
getter := coordinator.getters[peer]

var r redis.Reply
// 发送try开始命令
r = getter.RemoteExec(coordinator.dbIndex, utils.StringsToCmdLine("try", coordinator.id, "start"))
if reply.IsErrorReply(r) {
// 如果发生错误,则中断try,直接返回
return r
}

// 发送watch时的version,比较version是否发生变化
watching := coordinator.watching[peer]
for key, version := range watching {
versionStr := strconv.FormatInt(int64(version), 10)
r = getter.RemoteExec(coordinator.dbIndex, utils.StringsToCmdLine("try", coordinator.id, "watched", key, versionStr))
if reply.IsErrorReply(r) {
// 如果发生变化,则中断try,直接返回
return r
}
}

// 依次发送需要执行的命令,每一条命令=try tx_id cmdline,如 try 123456 set k1 v1
for _, cmdLine := range cmdLines {
tryCmd := make([][]byte, 0, 2+len(cmdLine))
tryCmd = append(tryCmd, utils.StringsToCmdLine("try", coordinator.id)...)
tryCmd = append(tryCmd, cmdLine...)
r = getter.RemoteExec(coordinator.dbIndex, tryCmd)
if reply.IsErrorReply(r) {
// 如果发生错误,则中断try,直接返回
return r
}
}

// 发送try结束命令
r = getter.RemoteExec(coordinator.dbIndex, utils.StringsToCmdLine("try", coordinator.id, "end"))
if reply.IsErrorReply(r) {
// 如果发生错误,则中断try,直接返回
return r
}

return reply.MakeOkReply()
}

本地事务

本地事务就是分布式事务在节点上的实际执行者,下面是本地事务的定义:

  • watchedKeys 对应 try watched 命令,记录 watch 时的版本,用于在执行时进行版本比较。
  • cmdLines 对应 try xxx 命令,记录当前节点的本地事务中,包含的所有需要执行的命令。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Transaction TCC本地事务
type Transaction struct {
id string // 事务id
phase int // 阶段,分别是try、commit、cancel三个阶段

writeKeys []string
readKeys []string
watchedKeys map[string]uint32 // 记录watch时的版本

cmdLines [][][]byte // 本次事务中包含的命令
undoLogs [][][][]byte // 本次事务的undo log
addVersionKeys []string // 需要增加版本的key

db *engine.DB

mu sync.Mutex
}

Try

在收到 try end 命令后,表明 try 阶段已经结束,此时本地事务:

  • 锁定需要读写的 key。
  • 在在时间轮中添加任务, 自动回滚超时未提交的事务。
  • 读取版本是否变化,如果有变化则返回错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Try TCC事务的结束try阶段,锁定key
func (tx *Transaction) Try() redis.Reply {
if tx.phase != phaseTry {
return reply.MakeErrReply("ERR TRY ERROR")
}

// 锁定需要读写的key
tx.db.RWLocks(tx.writeKeys, tx.readKeys)

// 在时间轮中添加任务, 自动回滚超时未提交的事务
taskKey := tx.id
timewheel.Delay(maxLockTime, taskKey, func() {
if tx.phase == phaseTry {
logger.Info("abort transaction: " + tx.id)
_ = tx.Cancel()
}
})

// 读取版本是否变化
for key, oldVersion := range tx.watchedKeys {
newVersion := tx.db.GetVersion(key)
if oldVersion != newVersion {
// 获取key的版本,如果版本变化了,则返回错误
return reply.MakeErrReply("ERR VERSION CHANGED")
}
}

return reply.MakeOkReply()
}

Commit

在本地事务收到 commit 命令后,会执行命令,(如果开启原子事务)并记录 undolog。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Commit TCC事务的commit阶段
func (tx *Transaction) Commit() redis.Reply {
tx.mu.Lock() // Commit互斥进行操作,防止commit时调用cancel,发生冲突
defer tx.mu.Unlock()
defer func() {
// 设置阶段
tx.phase = phaseCommit
}()

// 依次执行命令
var results [][]byte // 存储命令执行的结果
for _, cmdLine := range tx.cmdLines {
cmdName := string(cmdLine[0])
key := string(cmdLine[1])
if !engine.IsReadOnlyCommand(cmdName) {
// 写命令,则记录需要add版本的key
tx.addVersionKeys = append(tx.addVersionKeys, key)
if config.Properties.OpenAtomicTx {
// 开启了原子性事务,则记录undo log
tx.undoLogs = append(tx.undoLogs, tx.db.GetUndoLog(key))
}
}

// 执行命令
r := tx.db.ExecWithLock(cmdLine)

if config.Properties.OpenAtomicTx && reply.IsErrorReply(r) {
// 开启了原子性事务,并且如果发生错误,直接返回错误
if !engine.IsReadOnlyCommand(cmdName) {
// 删除错误命令的undo log,不需要增加版本号
tx.addVersionKeys = tx.addVersionKeys[:len(tx.addVersionKeys)-1]
tx.undoLogs = tx.undoLogs[:len(tx.undoLogs)-1]
}

return r
}

results = append(results, []byte(r.DataString()))
}

return reply.MakeMultiBulkStringReply(results)
}

Cancel

在收到 cancel 命令后,(如果开启原子性事务)会执行 undo log 进行回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Cancel TCC事务的cancel阶段
func (tx *Transaction) Cancel() redis.Reply {
tx.mu.Lock() // Cancel互斥进行操作,防止commit时调用cancel,发生冲突
defer tx.mu.Unlock()
defer func() {
// 设置阶段
tx.phase = phaseCancel
}()

// 从后向前依次执行undo日志,只有开启原子性事务时,len(tx.undoLogs)才可能大于0
for i := len(tx.undoLogs) - 1; i >= 0; i-- {
undoLog := tx.undoLogs[i]
if len(undoLog) == 0 {
continue
}

for _, cmdLine := range undoLog {
tx.db.ExecWithLock(cmdLine)
}

}

// 清空
tx.undoLogs = nil

return reply.MakeOkReply()
}