本项目完整地址 simple-redis
Hash 表 KV 内存数据库的核心是并发安全的哈希表,simple-redis 采用分段锁 策略。将 key 分散到固定数量的 shard 中,在一个 shard 内的读写操作不会阻塞其他 shard 内的操作 。
数据结构 定义 Dict 接口,该接口表示一个哈希表的操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type Consumer func (key string , val interface {}) bool type Dict interface { Get(key string ) (val interface {}, exists bool ) Len() int Put(key string , val interface {}) (result int ) PutIfAbsent(key string , val interface {}) (result int ) PutIfExists(key string , val interface {}) (result int ) Remove(key string ) (result int ) ForEach(consumer Consumer) Keys() []string RandomKeys(limit int ) []string RandomDistinctKeys(limit int ) []string Clear() }
定义 ConcurrentDict 实现了 Dict 接口,ConcurrentDict 为并发安全的哈希表。将 key 映射到不同的 shard 中,不同 shard 上的操作是不会相互影响的 ,提高了并发性 。
注意,这里 shard 的数量恒为 2 的整数幂。
1 2 3 4 5 6 7 8 9 10 type ConcurrentDict struct { table []*shard count int64 shardCount int } type shard struct { m map [string ]interface {} mutex sync.RWMutex }
定位 shard 将 key 映射为 shard 的哈希函数采用 FNV 算法。
FNV 能快速 hash 大量数据并保持较小的冲突率 ,它的高度分散使它适用于 hash 一些非常相近的字符串 。
1 2 3 4 5 6 7 8 9 10 const prime32 = uint32 (16777619 )func fnv32 (key string ) uint32 { hash := uint32 (2166136261 ) for i := 0 ; i < len (key); i++ { hash *= prime32 hash ^= uint32 (key[i]) } return hash }
依据 key 的哈希值定位 shard(确定 shard 的 index),当 n 为 2 的整数幂时 h % n == (n - 1) & h
。
spread 方法根据哈希值,返回 shard 的下标。
getShard 方法根据下标值,返回对应的 shard。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (c *ConcurrentDict) notInit () bool { return c == nil || c.table == nil } func (c *ConcurrentDict) spread (hashcode uint32 ) uint32 { if c.notInit() { panic ("dict is nil" ) } tableSize := uint32 (len (c.table)) return hashcode % tableSize } func (c *ConcurrentDict) getShard (index uint32 ) *shard { if c.notInit() { panic ("dict is nil" ) } return c.table[index] }
Get 和 Put 方法 以 Get 和 Put 方法为例,说明 ConcurrentDict 的实现。
Get Get 方法用于查询数据,给定一个 key,返回对应的值。其流程为:
计算 key 的哈希值,根据哈希值找到对应的 shard 。
对这个 shard 上读锁 ,在 map 中查询数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (c *ConcurrentDict) Get (key string ) (val interface {}, exists bool ) { if c.notInit() { panic ("dict is nil" ) } hashcode := fnv32(key) index := c.spread(hashcode) s := c.getShard(index) s.mutex.RLock() defer s.mutex.RUnlock() val, exists = s.m[key] return }
Put Put 方法用于插入数据,给定 key 和 value 将键值对插入到某一个 shard 的 map 中。其流程为:
计算 key 的哈希值,根据哈希值找到对应的 shard 。
对这个 shard 上写锁 ,在 map 中写入键值对。
新增数据返回 1,更新数据返回 0。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 if c.notInit() { panic ("dict is nil" ) } hashcode := fnv32(key) index := c.spread(hashcode) s := c.getShard(index) s.mutex.Lock() defer s.mutex.Unlock()if _, ok := s.m[key]; ok { s.m[key] = val return 0 } s.m[key] = val c.addCount() return 1
LockMap ConcurrentDict 可以保证单个 key 操作的并发安全性 ,但是仍无法满足需求。
比如:
在 MGET 或者 MSET 命令中,一条命令涉及到了多个 key 的枷锁问题,因此需要锁定所有给定的键 直到完成所有键的查询和设置。
因此,需要实现 LockMap,用于锁定一个或者一组 key。
数据结构 多个键可以共用一个哈希槽(与 ConcurrentDict shard 并不是一一对应的,应该小于等于 ConcurrentDict 的 shard),不为单个 key 加锁而是为它所在的哈希槽加锁 。
1 2 3 4 5 6 7 8 9 10 11 12 13 type Locks struct { tables []*sync.RWMutex } func Make (tableSize int ) *Locks { tables := make ([]*sync.RWMutex, tableSize) for i := range tables { tables[i] = &sync.RWMutex{} } return &Locks{ tables: tables, } }
在定位哈希槽时,同样对 key 做 FNV 哈希函数 之后选择哈希槽。
为一个 key 加锁 为一个 key 加锁时非常简单,就是对 key 做哈希之后选择相应的 shard,之后在对应的 shard 上加锁即可。
锁定多个 key 在锁定多个 key 时,所有协程都按照相同顺序加锁 ,就可以避免循环等待防止死锁。
确定加锁顺序 一次性锁定多个 key 之前,首先确定加锁顺序,这样可以避免死锁。
对要加锁的每一个 key 都做哈希函数 ,确定其对应的 shard 的下标(这里用 map 实现 set,避免重复的下标)。
对下标进行排序 ,即可得到一个确定的加锁顺序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (locks *Locks) toLockIndices (keys []string , reverse bool ) []uint32 { lockSet := make (map [uint32 ]struct {}) for _, key := range keys { hashcode := fnv32(key) index := locks.spread(hashcode) lockSet[index] = struct {}{} } indices := make ([]uint32 , 0 , len (lockSet)) for index := range lockSet { indices = append (indices, index) } sort.Slice(indices, func (i, j int ) bool { if reverse { return indices[i] < indices[j] } return indices[i] > indices[j] }) return indices }
为多个 key 加读写锁 确定了加锁顺序之后,就可以为多个 key 加锁了。这里以加读写锁为例进行讲解:
1 func (locks *Locks) RWLocks (writeKeys []string , readKeys []string )
流程如下:
首先不管是需要读的 key,还是需要写的 key,全部计算出加锁顺序 。
1 2 3 keys := append (readKeys, writeKeys...) indices := locks.toLockIndices(keys, false )
1 2 3 4 5 writeIndices := make (map [uint32 ]struct {}, len (writeKeys)) for _, writeKey := range writeKeys { writeIndices[locks.spread(fnv32(writeKey))] = struct {}{} }
按照顺序加锁 ,检查这个 shard 是否需要加上写锁,如果需要就加写锁否则加上读锁。
1 2 3 4 5 6 7 8 9 10 for _, index := range indices { if _, needWrite := writeIndices[index]; needWrite { locks.tables[index].Lock() continue } locks.tables[index].RLock() }
为多个 key 解读写锁 这里以解读写锁为例进行讲解:
1 func (locks *Locks) RWUnLocks (writeKeys []string , readKeys []string )
流程如下:
首先不管是需要读的 key,还是需要写的 key,全部计算出解锁顺序(解锁顺序与加锁顺序相反) 。
接着得到需要解写锁的 shard 下标 。
按照顺序解锁 ,检查这个 shard 是否需要解开写锁,如果需要就解写锁否则解读锁。