一致性哈希
分布式缓存中,如果一个节点收到请求,当前节点没有缓存该值,那么它面临的难题就是从哪个节点中获取该值。有一个办法就是将 key 进行哈希后除以节点数量取余,得到节点编号。
这样一种简单的哈希取余的方法,没有考虑到节点数量变化的场景。当节点数量变化时,几乎所有的缓存值都失效了。节点在接收到对应的请求时,均需要重新去数据源获取数据,容易引起缓存雪崩。
缓存雪崩:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。
原理
步骤
一致性哈希算法将 key 映射到 2^32 的空间中,将这个数字首尾相连,形成一个环。
- 节点的哈希值(可以用节点的名称、编号、IP 地址做哈希),放置在环上。
- 计算 key 的哈希值,放置在环上,顺时针寻找到的第一个节点,就是应选取的节点。
一致性哈希算法,在新增/删除节点时,只需要重新定位该节点附近的一小部分数据,而不需要重新定位所有的节点
数据倾斜问题
如果服务器的节点过少,容易引起 key 的倾斜。
为了解决这个问题,引入了虚拟节点的概念,一个真实节点对应多个虚拟节点。
虚拟节点扩充了节点的数量,解决了节点较少的情况下数据容易倾斜的问题。而且代价非常小,只需要增加一个字典,维护真实节点与虚拟节点的映射关系即可。
代码实现
Map 结构体
consistenthash.Map 结构体就是一致性哈希环:
- hash 字段为哈希函数。
- replicas 为虚拟节点的数量(包括一个实体节点)。
- keys 保存了所有节点的哈希值,保存的哈希值是有序的,可以看作是一致性哈希环(包括实体节点和虚拟节点)。
- hashMap 保存了节点哈希值与节点名称的映射(这里的节点名称仅仅指实体节点)。
1 | type Hash func(data []byte) uint32 |
添加节点
Add 方法用于在一致性哈希环中添加节点。
1 | // Add adds some keys to the hash. |
获取对应节点
Get 方法用于获取 key 哈希后,距离最近的节点(sort.Search 函数可以看作是顺时针查找节点)。
sort.Search 函数采用二分法搜索找到 [0, n) 区间内最小的满足 f(i)==true 的值 i。
也就是说,sort.Search 函数希望 f 在输入位于区间 [0, n) 的前面某部分(可以为空)时返回假,而在输入位于剩余至结尾的部分(可以为空)时返回真;sort.Search 函数会返回满足 f(i)==true的 最小值 i。如果没有该值,函数会返回 n。
1 | // Get gets the closest item in the hash to the provided key. |
singleflight
singleflight 用于解决缓存击穿问题。
缓存击穿:在高并发的场景中,大量的请求同时查询一个 key ,如果这个 key 正好过期失效了,就会导致大量的请求都打到数据库,导致数据库的连接增多,负载上升。
通过是singleflight 可以将对同一个 key 的并发请求进行合并,只让其中一个请求到数据库进行查询,其他请求共享同一个结果,可以很大程度提升并发能力。
call 和 Group 结构体
singleflight.call 结构体用于表示一个请求:
- wg:用于实现通过 1 个 call,其他 call 阻塞。
- val:表示 call 操作的返回结果。
- err:表示 call 操作发生的错误。
singleflight.Group 是 singleflight 的总控结构:
- mu:用锁控制请求。
- m:不同的 call 对应不同的 key。
1 | // call is an in-flight or completed Do call |
flightGroup 接口
singleflight.Group 接口实现了 flightGroup 接口:
1 | // flightGroup is defined as an interface which flightgroup.Group |
Do 方法
Do 方法是核心方法,其流程如下:
首先查看是否有请求正在请求 key,如果有就阻塞住等待结果并返回。
1
2
3
4
5if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait() // 阻塞住等待返回结果
return c.val, c.err
}如果没有(表明这个请求是第一个请求),就新建一个 call,真正的发起请求:
1
2
3
4
5
6c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn() // 发起真正的请求- 请求结束后,调用
c.wg.Done()
方法,通知后续同样请求这个 key 被阻塞的请求,结束阻塞取回结果。 - 从
g.m
中删除本次请求,本次 singleflight 结束。
- 请求结束后,调用
1 | func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { |