本项目完整地址 simple-redis
集群
Simple-Redis 的集群本质上就是一个分片集群,使用一致性哈希环实现。
结构体
Cluster 结构体如下,其中:
- peers 为一致性哈希,用于根据 key 选择节点。
- getters 用于和远程节点通信。
以下三个属性与分布式事务相关:
- idGenerator:雪花 ID(分布式 ID)生成器,用于生成分布式事务 ID。
- transactionMap:记录所有的分布式事务。
- coordinatorMap:记录所有的事物协调者。
1 2 3 4 5 6 7 8 9 10
| type Cluster struct { self string peers cluster.PeerPicker getters map[string]cluster.PeerGetter
idGenerator *snowflake.Node transactionMap *dict.SimpleDict coordinatorMap *dict.SimpleDict }
|
getter
在和远程节点通信时,本节点就作为一个 Client。而一个 getter 维护一个远程节点的连接池(实际上一个 getter 维护多个连接池,一个数据库对应一个连接池。
集群中节点之间的通信采用连接池,是为了:
- 复用连接(复用空闲连接)。
- 增加并发性能(节点之间有多个连接)。
- 控制节点之间的连接数,不会因为节点之间的通信而影响客户端与节点之间的通信(有最大空闲连接数和最大活跃连接数的限制)。
构造函数
getter 的构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| func newGetter(addr string) *getter { finalizer := func(x interface{}) { if c, ok := x.(*client.Client); ok { c.Close() } }
checkAlive := func(x interface{}) bool { if c, ok := x.(*client.Client); ok { return c.StatusClosed() }
return false }
poolMap := make(map[int]*pool.Pool, config.Properties.Databases) for i := 0; i < config.Properties.Databases; i++ { var dbIndex = i factory := func() (interface{}, error) { c, err := client.MakeClient(addr, config.Properties.Keepalive) if err != nil { return nil, err }
c.Start() if config.Properties.Password != "" { r := c.Send(utils.StringsToCmdLine("AUTH", config.Properties.Password)) if reply.IsErrorReply(r) { c.Close() return nil, reply.MakeErrReply("ERR cluster password is required, please set same password in cluster") } } r := c.Send(utils.StringsToCmdLine("SELECT", strconv.Itoa(dbIndex))) if reply.IsErrorReply(r) { c.Close() return nil, reply.MakeErrReply("ERR exec command failed") } return c, nil }
poolMap[dbIndex] = pool.New(factory, finalizer, checkAlive, pool.Config{ MaxIdleNum: 8, MaxActiveNum: 16, MaxRetryNum: 1, }) }
return &getter{ addr: addr, poolMap: poolMap, } }
|
连接池
连接池维护客户端与远程节点之间的活跃连接和空闲连接,并且有最大空闲连接数和最大活跃连接数的要求。
在获取一个连接时:
- 如果已有空闲连接,在检查是否存活后(CheckAliveFunc)直接返回一个空闲连接。
- 如果没有空闲连接或者获取到的连接未存活,并且没有超过最大活跃连接数,则创建一个新的连接(FactoryFunc)。
在放回一个连接时:
- 如果空闲连接数已满,则结束连接(FinalizerFunc)。
- 如果空闲连接数未满,则放入空闲连接队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| type ( FactoryFunc func() (interface{}, error) FinalizerFunc func(x interface{}) CheckAliveFunc func(x interface{}) bool )
var ( ErrClosed = errors.New("pool closed") ErrMaxActive = errors.New("active connections reached max num") )
type Config struct { MaxIdleNum int MaxActiveNum int MaxRetryNum int }
type Pool struct { Config
factory FactoryFunc finalizer FinalizerFunc checkAlive CheckAliveFunc idles chan interface{} activeConnNum int closed bool
mu sync.Mutex }
|
订阅发布
Simple-Redis 的订阅发布实现思想与 Redis 相同,一个发布订阅的管道维护了客户端连接的集合、一个消息管道。
- 当有消息发布时,取所有的客户端连接,发布消息。
- 当有客户端订阅时,将客户端连接加入到订阅者集合中。
- 当客户端断开连接时,从订阅者集合中删除客户端连接。
在集群模式下,Redis 和 Simple-Redis 是不同的:
- Redis 仅仅支持在同一节点上的发布订阅。
- Simple-Redis 支持集群范围内的发布订阅,为了支持这一功能,定义了 publish 和 publishsingle 命令。
- publish 命令:客户端向集群中的一个节点发送 publish 命令,节点会向所有的 peers 转发这条发布消息。
- publishsingle 命令:节点向所有 peers 转发发布的消息时,使用的是 publishsingle 命令,表明节点在收到消息后不用继续转发了(避免了节点转发再转发,造成的无限消息洪泛)。
定义
1 2 3 4
| type Publish struct { channels map[string]*channel mu sync.Mutex }
|
发布订阅管道定义如下,其中维护了订阅者(Redis 中用链表实现,而 Simple-Redis 使用 map 实现)、一个消息 channel 用于向所有客户端发布消息。
1 2 3 4 5 6 7 8 9 10
| type channel struct { name string messageCh chan []byte subscriberNum int subscribers map[redis.Connection]struct{}
closed chan struct{} wait wait.Wait mu sync.Mutex }
|