Dawn's Blogs

分享技术 记录成长

0%

Groupcache源码阅读 (2) 一致性哈希和Singleflight

一致性哈希

分布式缓存中,如果一个节点收到请求,当前节点没有缓存该值,那么它面临的难题就是从哪个节点中获取该值。有一个办法就是将 key 进行哈希后除以节点数量取余,得到节点编号。

这样一种简单的哈希取余的方法,没有考虑到节点数量变化的场景。当节点数量变化时,几乎所有的缓存值都失效了。节点在接收到对应的请求时,均需要重新去数据源获取数据,容易引起缓存雪崩

缓存雪崩:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。

原理

步骤

一致性哈希算法将 key 映射到 2^32 的空间中,将这个数字首尾相连,形成一个环。

  • 节点的哈希值(可以用节点的名称、编号、IP 地址做哈希),放置在环上
  • 计算 key 的哈希值,放置在环上,顺时针寻找到的第一个节点,就是应选取的节点。

一致性哈希添加节点 consistent hashing add peer

一致性哈希算法,在新增/删除节点时,只需要重新定位该节点附近的一小部分数据,而不需要重新定位所有的节点

数据倾斜问题

如果服务器的节点过少,容易引起 key 的倾斜

为了解决这个问题,引入了虚拟节点的概念,一个真实节点对应多个虚拟节点。

虚拟节点扩充了节点的数量,解决了节点较少的情况下数据容易倾斜的问题。而且代价非常小,只需要增加一个字典,维护真实节点与虚拟节点的映射关系即可。

代码实现

Map 结构体

consistenthash.Map 结构体就是一致性哈希环:

  • hash 字段为哈希函数。
  • replicas 为虚拟节点的数量(包括一个实体节点)。
  • keys 保存了所有节点的哈希值,保存的哈希值是有序的,可以看作是一致性哈希环(包括实体节点和虚拟节点)。
  • hashMap 保存了节点哈希值与节点名称的映射(这里的节点名称仅仅指实体节点)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Hash func(data []byte) uint32

type Map struct {
hash Hash
replicas int
keys []int // Sorted
hashMap map[int]string
}

func New(replicas int, fn Hash) *Map {
m := &Map{
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
}
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
}
return m
}

添加节点

Add 方法用于在一致性哈希环中添加节点。

1
2
3
4
5
6
7
8
9
10
11
// Add adds some keys to the hash.
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}

获取对应节点

Get 方法用于获取 key 哈希后,距离最近的节点(sort.Search 函数可以看作是顺时针查找节点)。

sort.Search 函数采用二分法搜索找到 [0, n) 区间内最小的满足 f(i)==true 的值 i

也就是说,sort.Search 函数希望 f 在输入位于区间 [0, n) 的前面某部分(可以为空)时返回,而在输入位于剩余至结尾的部分(可以为空)时返回;sort.Search 函数会返回满足 f(i)==true的 最小值 i。如果没有该值,函数会返回 n。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Get gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
if m.IsEmpty() {
return ""
}

hash := int(m.hash([]byte(key)))

// Binary search for appropriate replica.
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })

// Means we have cycled back to the first replica.
if idx == len(m.keys) {
idx = 0
}

return m.hashMap[m.keys[idx]]
}

singleflight

singleflight 用于解决缓存击穿问题。

缓存击穿:在高并发的场景中,大量的请求同时查询一个 key ,如果这个 key 正好过期失效了,就会导致大量的请求都打到数据库,导致数据库的连接增多,负载上升。

通过是singleflight 可以将对同一个 key 的并发请求进行合并,只让其中一个请求到数据库进行查询,其他请求共享同一个结果,可以很大程度提升并发能力。

call 和 Group 结构体

singleflight.call 结构体用于表示一个请求:

  • wg:用于实现通过 1 个 call,其他 call 阻塞
  • val:表示 call 操作的返回结果
  • err:表示 call 操作发生的错误

singleflight.Group 是 singleflight 的总控结构:

  • mu:用锁控制请求
  • m:不同的 call 对应不同的 key
1
2
3
4
5
6
7
8
9
10
11
12
13
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}

// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}

flightGroup 接口

singleflight.Group 接口实现了 flightGroup 接口:

1
2
3
4
5
6
7
// flightGroup is defined as an interface which flightgroup.Group
// satisfies. We define this so that we may test with an alternate
// implementation.
type flightGroup interface {
// Done is called when Do is done.
Do(key string, fn func() (interface{}, error)) (interface{}, error)
}

Do 方法

Do 方法是核心方法,其流程如下:

  • 首先查看是否有请求正在请求 key,如果阻塞住等待结果并返回

    1
    2
    3
    4
    5
    if c, ok := g.m[key]; ok {
    g.mu.Unlock()
    c.wg.Wait() // 阻塞住等待返回结果
    return c.val, c.err
    }
  • 如果没有(表明这个请求是第一个请求),就新建一个 call,真正的发起请求

    1
    2
    3
    4
    5
    6
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    c.val, c.err = fn() // 发起真正的请求
    • 请求结束后,调用 c.wg.Done() 方法,通知后续同样请求这个 key 被阻塞的请求,结束阻塞取回结果。
    • g.m 中删除本次请求,本次 singleflight 结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

c.val, c.err = fn()
c.wg.Done()

g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()

return c.val, c.err
}