Dawn's Blogs

分享技术 记录成长

0%

Simple-Redis实现 (10) 集群和订阅

本项目完整地址 simple-redis

集群

Simple-Redis 的集群本质上就是一个分片集群,使用一致性哈希环实现。

结构体

Cluster 结构体如下,其中:

  • peers 为一致性哈希,用于根据 key 选择节点。
  • getters 用于和远程节点通信。

以下三个属性与分布式事务相关:

  • idGenerator:雪花 ID(分布式 ID)生成器,用于生成分布式事务 ID。
  • transactionMap:记录所有的分布式事务。
  • coordinatorMap:记录所有的事物协调者。
1
2
3
4
5
6
7
8
9
10
// Cluster 用于和集群中的主机进行交互
type Cluster struct {
self string // 本机地址,如 127.0.0.1:6107
peers cluster.PeerPicker // 一致性哈希,用于选择节点
getters map[string]cluster.PeerGetter // 用于和远程节点通信

idGenerator *snowflake.Node // snowflake id生成器,用于生成分布式事务的id
transactionMap *dict.SimpleDict // 记录所有的分布式事务(本地)
coordinatorMap *dict.SimpleDict // 记录事务协调者
}

getter

在和远程节点通信时,本节点就作为一个 Client。而一个 getter 维护一个远程节点的连接池(实际上一个 getter 维护多个连接池,一个数据库对应一个连接池。

集群中节点之间的通信采用连接池,是为了:

  • 复用连接(复用空闲连接)。
  • 增加并发性能(节点之间有多个连接)。
  • 控制节点之间的连接数,不会因为节点之间的通信而影响客户端与节点之间的通信(有最大空闲连接数和最大活跃连接数的限制)。

构造函数

getter 的构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func newGetter(addr string) *getter {
// Client连接结束函数
finalizer := func(x interface{}) {
if c, ok := x.(*client.Client); ok {
c.Close()
}
}

checkAlive := func(x interface{}) bool {
if c, ok := x.(*client.Client); ok {
return c.StatusClosed()
}

return false
}

// 为每一个连接创建连接池
poolMap := make(map[int]*pool.Pool, config.Properties.Databases)
for i := 0; i < config.Properties.Databases; i++ {
var dbIndex = i
// Client连接构造函数
factory := func() (interface{}, error) {
c, err := client.MakeClient(addr, config.Properties.Keepalive)
if err != nil {
return nil, err
}

c.Start()
// 如果需要密码,先进行验证
if config.Properties.Password != "" {
r := c.Send(utils.StringsToCmdLine("AUTH", config.Properties.Password))
if reply.IsErrorReply(r) {
c.Close()
return nil, reply.MakeErrReply("ERR cluster password is required, please set same password in cluster")
}
}
// 切换数据库
r := c.Send(utils.StringsToCmdLine("SELECT", strconv.Itoa(dbIndex)))
if reply.IsErrorReply(r) {
c.Close()
return nil, reply.MakeErrReply("ERR exec command failed")
}
return c, nil
}

poolMap[dbIndex] = pool.New(factory, finalizer, checkAlive, pool.Config{
MaxIdleNum: 8,
MaxActiveNum: 16,
MaxRetryNum: 1,
})
}

return &getter{
addr: addr,
poolMap: poolMap,
}
}

连接池

连接池维护客户端与远程节点之间的活跃连接和空闲连接,并且有最大空闲连接数和最大活跃连接数的要求。

获取一个连接时:

  • 如果已有空闲连接,在检查是否存活后(CheckAliveFunc)直接返回一个空闲连接。
  • 如果没有空闲连接或者获取到的连接未存活,并且没有超过最大活跃连接数,则创建一个新的连接(FactoryFunc)。

放回一个连接时:

  • 如果空闲连接数已满,则结束连接(FinalizerFunc)。
  • 如果空闲连接数未满,则放入空闲连接队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type (
FactoryFunc func() (interface{}, error)
FinalizerFunc func(x interface{})
CheckAliveFunc func(x interface{}) bool
)

var (
ErrClosed = errors.New("pool closed")
ErrMaxActive = errors.New("active connections reached max num")
)

type Config struct {
MaxIdleNum int // 最大空闲连接数
MaxActiveNum int // 最大活跃连接数
MaxRetryNum int
}

// Pool 连接池
type Pool struct {
Config

factory FactoryFunc // 创建连接
finalizer FinalizerFunc // 关闭连接
checkAlive CheckAliveFunc // 检查连接是否存活
idles chan interface{} // 空闲的连接
activeConnNum int // 活跃连接数
closed bool

mu sync.Mutex
}

订阅发布

Simple-Redis 的订阅发布实现思想与 Redis 相同,一个发布订阅的管道维护了客户端连接的集合、一个消息管道

  • 当有消息发布时,取所有的客户端连接,发布消息。
  • 当有客户端订阅时,将客户端连接加入到订阅者集合中。
  • 当客户端断开连接时,从订阅者集合中删除客户端连接。

在集群模式下,Redis 和 Simple-Redis 是不同的:

  • Redis 仅仅支持在同一节点上的发布订阅。
  • Simple-Redis 支持集群范围内的发布订阅,为了支持这一功能,定义了 publish 和 publishsingle 命令。
    • publish 命令:客户端向集群中的一个节点发送 publish 命令,节点会向所有的 peers 转发这条发布消息
    • publishsingle 命令:节点向所有 peers 转发发布的消息时,使用的是 publishsingle 命令,表明节点在收到消息后不用继续转发了(避免了节点转发再转发,造成的无限消息洪泛)。

定义

1
2
3
4
type Publish struct {
channels map[string]*channel // 维护各个管道,key为管道名字
mu sync.Mutex
}

发布订阅管道定义如下,其中维护了订阅者(Redis 中用链表实现,而 Simple-Redis 使用 map 实现)、一个消息 channel 用于向所有客户端发布消息。

1
2
3
4
5
6
7
8
9
10
type channel struct {
name string // 管道名
messageCh chan []byte // 当前管道中待发送的消息
subscriberNum int // 订阅者的数量
subscribers map[redis.Connection]struct{} // 这个管道所有的订阅者,在redis中用链表实现,而在 simple-redis 中使用 map(set)

closed chan struct{}
wait wait.Wait
mu sync.Mutex
}