Dawn's Blogs

分享技术 记录成长

0%

godis源码阅读 (3) 存储数据结构之Hash表和LockMap

Hash 表

KV 内存数据库的核心是并发安全的哈希表,godis 采用分段锁策略。将 key 分散到固定数量的 shard 中,在一个 shard 内的读写操作不会阻塞其他 shard 内的操作

数据结构

定义 Dict 接口,该接口表示一个哈希表的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Consumer is used to traversal dict, if it returns false the traversal will be break
type Consumer func(key string, val interface{}) bool

// Dict is interface of a key-value data structure
type Dict interface {
Get(key string) (val interface{}, exists bool)
Len() int
Put(key string, val interface{}) (result int)
PutIfAbsent(key string, val interface{}) (result int)
PutIfExists(key string, val interface{}) (result int)
Remove(key string) (result int)
ForEach(consumer Consumer)
Keys() []string
RandomKeys(limit int) []string
RandomDistinctKeys(limit int) []string
Clear()
}

定义 ConcurrentDict 实现了 Dict 接口,ConcurrentDict 为并发安全的哈希表。将 key 映射到不同的 shard 中,不同 shard 上的操作是不会相互影响的,提高了并发性

注意,这里 shard 的数量恒为 2 的整数幂。

1
2
3
4
5
6
7
8
9
10
11
// ConcurrentDict is thread safe map using sharding lock
type ConcurrentDict struct {
table []*shard
count int32
shardCount int
}

type shard struct {
m map[string]interface{}
mutex sync.RWMutex
}

定位 shard

将 key 映射为 shard 的哈希函数采用 FNV 算法。

FNV 能快速 hash 大量数据并保持较小的冲突率,它的高度分散使它适用于 hash 一些非常相近的字符串

1
2
3
4
5
6
7
8
9
10
const prime32 = uint32(16777619)

func fnv32(key string) uint32 {
hash := uint32(2166136261)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}

依据 key 的哈希值定位 shard(确定 shard 的 index),当 n 为 2 的整数幂时 h % n == (n - 1) & h

  • spread 方法根据哈希值,返回 shard 的下标。
  • getShard 方法根据下标值,返回对应的 shard。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (dict *ConcurrentDict) spread(hashCode uint32) uint32 {
if dict == nil {
panic("dict is nil")
}
tableSize := uint32(len(dict.table))
return (tableSize - 1) & hashCode
}

func (dict *ConcurrentDict) getShard(index uint32) *shard {
if dict == nil {
panic("dict is nil")
}
return dict.table[index]
}

Get 和 Put 方法

以 Get 和 Put 方法为例,阅读 ConcurrentDict 的实现。

Get

Get 方法用于查询数据,给定一个 key,返回对应的值。其流程为:

  • 计算 key 的哈希值,根据哈希值找到对应的 shard
  • 对这个 shard 上读锁,在 map 中查询数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
// Get returns the binding value and whether the key is exist
func (dict *ConcurrentDict) Get(key string) (val interface{}, exists bool) {
if dict == nil {
panic("dict is nil")
}
hashCode := fnv32(key)
index := dict.spread(hashCode)
s := dict.getShard(index)
s.mutex.RLock()
defer s.mutex.RUnlock()
val, exists = s.m[key]
return
}

Put

Put 方法用于插入数据,给定 key 和 value 将键值对插入到某一个 shard 的 map 中。其流程为:

  • 计算 key 的哈希值,根据哈希值找到对应的 shard
  • 对这个 shard 上写锁,在 map 中写入键值对。
  • 新增数据返回 1,更新数据返回 0。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Put puts key value into dict and returns the number of new inserted key-value
func (dict *ConcurrentDict) Put(key string, val interface{}) (result int) {
if dict == nil {
panic("dict is nil")
}
hashCode := fnv32(key)
index := dict.spread(hashCode)
s := dict.getShard(index)
s.mutex.Lock()
defer s.mutex.Unlock()

if _, ok := s.m[key]; ok {
s.m[key] = val
return 0
}
dict.addCount()
s.m[key] = val
return 1
}

LockMap

ConcurrentDict 可以保证单个 key 操作的并发安全性,但是仍无法满足需求。

比如:

  • INCR 命令需要完成:读取、做加法、写入,这三步操作。
  • MSETNX 命令当且仅当所有给定键都不存在时所有给定键设置值,因此需要锁定所有给定的键直到完成所有键的检查和设置。

因此,需要实现 LockMap,用于锁定一个或者一组 key。

数据结构

多个键可以共用一个哈希槽(这里的哈希槽 shard 仅仅指逻辑上的概念,不存储数据仅仅是锁,与 ConcurrentDict shard 并不是一一对应的),不为单个 key 加锁而是为它所在的哈希槽加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Locks provides rw locks for key
type Locks struct {
table []*sync.RWMutex
}

// Make creates a new lock map
func Make(tableSize int) *Locks {
table := make([]*sync.RWMutex, tableSize)
for i := 0; i < tableSize; i++ {
table[i] = &sync.RWMutex{}
}
return &Locks{
table: table,
}
}

在定位哈希槽时,同样对 key 做 FNV 哈希函数之后选择哈希槽。

为一个 key 加锁

为一个 key 加锁时非常简单,就是对 key 做哈希之后选择相应的 shard,之后在对应的 shard 上加锁即可。

锁定多个 key

在锁定多个 key 时,所有协程都按照相同顺序加锁,就可以避免循环等待防止死锁。

确定加锁顺序

一次性锁定多个 key 之前,首先确定加锁顺序,这样可以避免死锁。

  • 对要加锁的每一个 key 都做哈希函数,确定其对应的 shard 的下标(这里用 map 实现 set,避免重复的下标)。
  • 下标进行排序,即可得到一个确定的加锁顺序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (locks *Locks) toLockIndices(keys []string, reverse bool) []uint32 {
indexMap := make(map[uint32]bool)
for _, key := range keys {
index := locks.spread(fnv32(key))
indexMap[index] = true
}
indices := make([]uint32, 0, len(indexMap))
for index := range indexMap {
indices = append(indices, index)
}
sort.Slice(indices, func(i, j int) bool {
if !reverse {
return indices[i] < indices[j]
}
return indices[i] > indices[j]
})
return indices
}

为多个 key 加读写锁

确定了加锁顺序之后,就可以为多个 key 加锁了。这里以加读写锁为例进行讲解:

1
func (locks *Locks) RWLocks(writeKeys []string, readKeys []string)

流程如下:

  • 首先不管是需要读的 key,还是需要写的 key,全部计算出加锁顺序
1
2
keys := append(writeKeys, readKeys...)
indices := locks.toLockIndices(keys, false)
  • 接着得到需要加写锁的 shard 下标
1
2
3
4
5
writeIndexSet := make(map[uint32]struct{})
for _, wKey := range writeKeys {
idx := locks.spread(fnv32(wKey))
writeIndexSet[idx] = struct{}{}
}
  • 按照顺序加锁,检查这个 shard 是否需要加上写锁,如果需要就加写锁否则加上读锁。
1
2
3
4
5
6
7
8
9
for _, index := range indices {
_, w := writeIndexSet[index]
mu := locks.table[index]
if w {
mu.Lock()
} else {
mu.RLock()
}
}

为多个 key 解读写锁

这里以解读写锁为例进行讲解:

1
func (locks *Locks) RWUnLocks(writeKeys []string, readKeys []string)

流程如下:

  • 首先不管是需要读的 key,还是需要写的 key,全部计算出解锁顺序(解锁顺序与加锁顺序相反)
  • 接着得到需要解写锁的 shard 下标
  • 按照顺序解锁,检查这个 shard 是否需要解开写锁,如果需要就解写锁否则解读锁。