Dawn's Blogs

分享技术 记录成长

0%

List

simple-redis 中定义了 List 接口,以定义 List 的各种操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Expected check whether given item is equals to expected value
type Expected func(a interface{}) bool

// Consumer traverses list.
// It receives index and value as params, returns true to continue traversal, while returns false to break
type Consumer func(i int, v interface{}) bool

type List interface {
Add(val interface{})
Get(index int) (val interface{})
Set(index int, val interface{})
Insert(index int, val interface{})
Remove(index int) (val interface{})
RemoveLast() (val interface{})
RemoveAllByVal(expected Expected) int
RemoveByVal(expected Expected, count int) int
ReverseRemoveByVal(expected Expected, count int) int
Len() int
ForEach(consumer Consumer)
Contains(expected Expected) bool
Range(start int, stop int) []interface{}
}

数据结构

快速链表

在 simple-redis 中,采用快速链表作为 List 的数据结构。

快速链表实际上就是一个双向链表,但是双向链表中的每一个节点不是存储一个数据,而是将数据连续存放形成一段连续的存储空间作为链表的节点。

这一段连续的存储空间在 godis 中被称为 page(每一个 page 的大小为 1024),page 的类型为空接口切片

1
2
3
4
5
6
7
8
9
const (
pageSize = 1024
)

// QuickList 快速链表,用于实现list
type QuickList struct {
data *list.List
size int
}

迭代器

又定义了 iterator 为快速链表的迭代器,在 [-1, QuickList.Len()] 范围内移动。

  • node 表示元素所在的 page。
  • offset 表示这个元素在 page 中的下标。
1
2
3
4
5
6
// iterator of QuickList, move between [-1, ql.Len()]
type iterator struct {
node *list.Element
offset int
ql *QuickList
}
阅读全文 »

Hash 表

KV 内存数据库的核心是并发安全的哈希表,godis 采用分段锁策略。将 key 分散到固定数量的 shard 中,在一个 shard 内的读写操作不会阻塞其他 shard 内的操作

数据结构

定义 Dict 接口,该接口表示一个哈希表的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Consumer is used to traversal dict, if it returns false the traversal will be break
type Consumer func(key string, val interface{}) bool

// Dict is interface of a key-value data structure
type Dict interface {
Get(key string) (val interface{}, exists bool)
Len() int
Put(key string, val interface{}) (result int)
PutIfAbsent(key string, val interface{}) (result int)
PutIfExists(key string, val interface{}) (result int)
Remove(key string) (result int)
ForEach(consumer Consumer)
Keys() []string
RandomKeys(limit int) []string
RandomDistinctKeys(limit int) []string
Clear()
}

定义 ConcurrentDict 实现了 Dict 接口,ConcurrentDict 为并发安全的哈希表。将 key 映射到不同的 shard 中,不同 shard 上的操作是不会相互影响的,提高了并发性

注意,这里 shard 的数量恒为 2 的整数幂。

1
2
3
4
5
6
7
8
9
10
11
// ConcurrentDict is thread safe map using sharding lock
type ConcurrentDict struct {
table []*shard
count int32
shardCount int
}

type shard struct {
m map[string]interface{}
mutex sync.RWMutex
}
阅读全文 »

Redis 通信协议

Redis 自 2.0 版本起使用了统一的协议 RESP(REdis Serialization Protocol,Redis 序列化协议),该协议易于实现,计算机可以高效的进行解析且易于被人类读懂。

RESP 是一个二进制安全的文本协议,工作于 TCP 协议上。RESP 以作为单位,客户端和服务器发送的命令或数据一律以 \r\n (CRLF)作为换行符。

RESP 的五种格式

RESP 定义了 5 种格式:

  • 简单字符串(Simple String):服务器用来返回简单的结果,比如 OK。非二进制安全,不允许换行。
  • 错误信息(Error):服务器用来返回简单的错误信息,比如 ERR Invalid Synatx。非二进制安全,且不允许换行。
  • 整数(Integer):llen、scard 等命令的返回值,64位有符号整数。
  • 字符串(Bulk String):二进制安全字符串。
  • 数组(Array,又称 Multi Bulk Strings):Bulk String 数组,客户端发送指令以及 lrange 等命令响应的格式。

RESP 通过第一个字符表示格式:

  • 简单字符串:以 + 开始,如 +OK\r\n。
  • 错误:以 - 开始,如 -ERR Invalid Syntax\r\n。
  • 整数:以 : 开始,如 :1\r\n。
  • 字符串:以 $ 开始。Bulk String 有两行,第一行为 $+正文长度,第二行为实际内容。$-1 表示 nil,当使用 get 查询一个不存在的 key 时,响应为 nil。
  • 数组:以 * 开始。第一行为 *+数组长度,其后是相应数量的 Bulk String。
阅读全文 »

Godis 介绍

Godis 是一个用 Go 语言实现的 Redis 服务器。

目录结构

Godis 项目的目录结构:

  • 根目录: main 函数,执行入口

  • config: 配置文件解析

  • interface: 一些模块间的接口定义

  • lib: 各种工具,比如logger、同步和通配符

  • tcp: tcp 服务器实现

  • redis: redis 协议解析器

  • datastruct: redis 的各类数据结构实现

    • dict: hash 表
    • list: 链表
    • lock: 用于锁定 key 的锁组件
    • set: 基于hash表的集合
    • sortedset: 基于跳表实现的有序集合
  • database: 存储引擎核心

    • server.go: redis 服务实例, 支持多数据库, 持久化, 主从复制等能力
    • database.go: 单个 database 的数据结构和功能
    • router.go: 将命令路由给响应的处理函数
    • keys.go: del、ttl、expire 等通用命令实现
    • string.go: get、set 等字符串命令实现
    • list.go: lpush、lindex 等列表命令实现
    • hash.go: hget、hset 等哈希表命令实现
    • set.go: sadd 等集合命令实现
    • sortedset.go: zadd 等有序集合命令实现
    • pubsub.go: 发布订阅命令实现
    • geo.go: GEO 相关命令实现
    • sys.go: Auth 等系统功能实现
    • transaction.go: 单机事务实现
  • cluster: 集群

    • cluster.go: 集群入口
    • com.go: 节点间通信
    • del.go: delete 命令原子性实现
    • keys.go: key 相关命令集群中实现
    • mset.go: mset 命令原子性实现
    • multi.go: 集群内事务实现
    • pubsub.go: 发布订阅实现
    • rename.go: rename 命令集群实现
    • tcc.go: tcc 分布式事务底层实现
  • aof: AOF 持久化实现

main 函数

在阅读 TCP 服务器之前,首先来看看 main 函数:

  • 首先,调用 logger.Setup 注册初始化日志模块
  • 接着,调用 config.SetupConfig 加载配置文件
  • 最后,调用 tcp.ListenAndServeWithSignal 开启 TCP 服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
print(banner)
logger.Setup(&logger.Settings{
Path: "logs",
Name: "godis",
Ext: "log",
TimeFormat: "2006-01-02",
})
configFilename := os.Getenv("CONFIG")
if configFilename == "" {
if fileExists("redis.conf") {
config.SetupConfig("redis.conf")
} else {
config.Properties = defaultProperties
}
} else {
config.SetupConfig(configFilename)
}

err := tcp.ListenAndServeWithSignal(&tcp.Config{
Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port),
}, RedisServer.MakeHandler())
if err != nil {
logger.Error(err)
}
}
阅读全文 »

groupcache 的其中一个特定就是,它既是一个客户端,也是一个服务器。它连接到自己的 peer,形成分布式缓存。

所以,groupcache 中的 HTTP 服务即包括客户端也包括服务端。

HTTP 服务

客户端

groupcache 中,节点之间的通信是通过 protobuf 完成的,HTTP 客户端需要实现 ProtoGetter 接口。

1
2
3
4
// ProtoGetter is the interface that must be implemented by a peer.
type ProtoGetter interface {
Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResponse) error
}

httpGetter 结构体

httpGetter 结构体维护与某一个节点的 protobuf 通信,其中 baseURL 就记录了节点的远程 HTTP 地址。httpGetter 节点就实现了 ProtoGetter 接口。

1
2
3
4
type httpGetter struct {
transport func(context.Context) http.RoundTripper
baseURL string
}
阅读全文 »

Getter 从其他数据源加载数据

Getter 是一个接口,用于从别处(比如数据库中)加载数据。它只定义了一个函数 Get,用于根据 key 来加载数据。

1
2
3
4
5
6
7
8
9
10
// A Getter loads data for a key.
type Getter interface {
// Get returns the value identified by key, populating dest.
//
// The returned data must be unversioned. That is, key must
// uniquely describe the loaded data, without an implicit
// current time, and without relying on cache expiration
// mechanisms.
Get(ctx context.Context, key string, dest Sink) error
}

Sink 是一个接口,用于接收从缓存中 Get 到的数据

GetterFunc

GetterFunc 是接口型函数,这是为了向接口参数里传函数,就让函数继承了接口。

1
2
3
4
5
6
// A GetterFunc implements Getter with a function.
type GetterFunc func(ctx context.Context, key string, dest Sink) error

func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error {
return f(ctx, key, dest)
}

PeerPicker 选择节点

PeerPicker 是一个接口,用于实现根据一个 key 定位节点(根据一致性哈希,确定从哪一个分布式节点中获取数据)。

1
2
3
4
5
6
7
8
// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
// PickPeer returns the peer that owns the specific key
// and true to indicate that a remote peer was nominated.
// It returns nil, false if the key owner is the current peer.
PickPeer(key string) (peer ProtoGetter, ok bool)
}
阅读全文 »

一致性哈希

分布式缓存中,如果一个节点收到请求,当前节点没有缓存该值,那么它面临的难题就是从哪个节点中获取该值。有一个办法就是将 key 进行哈希后除以节点数量取余,得到节点编号。

这样一种简单的哈希取余的方法,没有考虑到节点数量变化的场景。当节点数量变化时,几乎所有的缓存值都失效了。节点在接收到对应的请求时,均需要重新去数据源获取数据,容易引起缓存雪崩

缓存雪崩:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。

原理

步骤

一致性哈希算法将 key 映射到 2^32 的空间中,将这个数字首尾相连,形成一个环。

  • 节点的哈希值(可以用节点的名称、编号、IP 地址做哈希),放置在环上
  • 计算 key 的哈希值,放置在环上,顺时针寻找到的第一个节点,就是应选取的节点。

一致性哈希添加节点 consistent hashing add peer

一致性哈希算法,在新增/删除节点时,只需要重新定位该节点附近的一小部分数据,而不需要重新定位所有的节点

数据倾斜问题

如果服务器的节点过少,容易引起 key 的倾斜

为了解决这个问题,引入了虚拟节点的概念,一个真实节点对应多个虚拟节点。

虚拟节点扩充了节点的数量,解决了节点较少的情况下数据容易倾斜的问题。而且代价非常小,只需要增加一个字典,维护真实节点与虚拟节点的映射关系即可。

阅读全文 »

Groupcache 介绍

Groupcache 是一个内存级别的分布式缓存,在许多情况下可以替代 memcached 节点池。

和 memcached 比较

相同点

Groupcache 与 memcached 相比,相同点在于:

  • 按照 key 进行切分,用于选择哪个节点对该 key 负责。

不同点

不同点在于:

  • 不需要运行一组单独的服务器。groupcache 是一个客户端,也是一个服务器。它连接到自己的 peer,形成分布式缓存。
  • 带有缓存填充机制。memcached 只是说“缓存丢失”,这通常会导致来自无限数量的客户端连接数据库(或其他)加载缓存。而 groupcache 协调缓存填充,使得整个复制的一组进程中只有一个加载填充缓存(singleflight),然后将加载的值多路传输给所有调用方。
  • 不支持变更 values。既没有缓存过期时间,也不能显式的从缓存中删除。
  • 支持将热点条目自动镜像到多个进程。对于热点条目,自动镜像到多个节点,这样可以防止单点存储热点数据的服务器过载。

LRU

LRU,Least Recently Used,最近最少使用。如果数据最近被访问过,那么将来被访问的概率也会更高。

LRU 算法的实现非常简单,维护一个队列,如果某条记录被访问了,则移动到队尾,那么队首则是最近最少访问的数据,淘汰该条记录即可。

数据结构

下图就是 LRU 的数据结构:

  • 蓝色的是字典,用于保存 key 和 value 的映射关系。
  • 红色的是双向链表(作为队列),将所有的值存储在双向链表中。头部代表最近被访问的数据,尾部代表最久未被访问过的数据
    • 当访问到值时将这个元素移动到双向链表的头部。
    • 如果需要删除最近最久未被访问的数据,只需要删除尾部元素即可。

implement lru algorithm with golang

阅读全文 »

Writer

n9e server writer 用于在 Prometheus 中写入数据,主要的作用是写入 target_up 指标

Writers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type WritersType struct {
globalOpt config.WriterGlobalOpt
backends map[string]WriterType
queues map[string]map[int]*SafeListLimited
}


func NewWriters() WritersType {
return WritersType{
backends: make(map[string]WriterType),
}
}

var Writers = NewWriters()

Writers 是全局变量,用于保存所有的 writer。其类型为 WritersType,其他模块可以调用 PushSample 方法向队列中写入数据,Writers 会定期的将队列中的数据写入到 Prometheus 中

1
func (ws *WritersType) PushSample(ident string, v interface{}, clusters ...string)

WritersType 有三个字段:

  • globalOpt 用于记录所有 writer 共有的配置信息,如队列数量、队列长度等。

    1
    2
    3
    4
    5
    6
    type WriterGlobalOpt struct {
    QueueCount int
    QueueMaxSize int
    QueuePopSize int
    ShardingKey string
    }
  • backends 用于保存集群名字与实际与 Prometheus 通信的写入 API 的映射(WriterType)。

    1
    2
    3
    4
    type WriterType struct {
    Opts config.WriterOptions
    Client api.Client
    }
  • queue 是一个带有最大长度限制的队列。

Init

Init 函数用于初始化 Writers。

  • 首先配置信息,为每一个集群都开启 globalOpt.QueueCount 个队列,并且开启协程 Writers.StartConsumer 开始消费,定期将队列中的数据发送给 Prometheus。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Writers.globalOpt = globalOpt
    Writers.queues = make(map[string]map[int]*SafeListLimited)
    for _, opt := range opts {
    if _, ok := Writers.queues[opt.ClusterName]; !ok {
    Writers.queues[opt.ClusterName] = make(map[int]*SafeListLimited)
    for i := 0; i < globalOpt.QueueCount; i++ {
    Writers.queues[opt.ClusterName][i] = NewSafeListLimited(Writers.globalOpt.QueueMaxSize)
    go Writers.StartConsumer(i, Writers.queues[opt.ClusterName][i], opt.ClusterName)
    }
    }
    }
  • 其次,开启一个协程,用于向普罗米修斯报告每一个队列的当前长度

    1
    go reportChanSize()
  • 最后,根据每一个 wirter 的配置,开启与 Prometheus 连接的客户端(用于写入数据),并且调用 Writers.Put 函数将开启的客户端记录下来(保存在 WritersType.backends 字段)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    for i := 0; i < len(opts); i++ {
    cli, err := api.NewClient(api.Config{
    Address: opts[i].Url,
    RoundTripper: &http.Transport{
    // TLSClientConfig: tlsConfig,
    Proxy: http.ProxyFromEnvironment,
    DialContext: (&net.Dialer{
    Timeout: time.Duration(opts[i].DialTimeout) * time.Millisecond,
    KeepAlive: time.Duration(opts[i].KeepAlive) * time.Millisecond,
    }).DialContext,
    ResponseHeaderTimeout: time.Duration(opts[i].Timeout) * time.Millisecond,
    TLSHandshakeTimeout: time.Duration(opts[i].TLSHandshakeTimeout) * time.Millisecond,
    ExpectContinueTimeout: time.Duration(opts[i].ExpectContinueTimeout) * time.Millisecond,
    MaxConnsPerHost: opts[i].MaxConnsPerHost,
    MaxIdleConns: opts[i].MaxIdleConns,
    MaxIdleConnsPerHost: opts[i].MaxIdleConnsPerHost,
    IdleConnTimeout: time.Duration(opts[i].IdleConnTimeout) * time.Millisecond,
    },
    })

    if err != nil {
    return err
    }

    writer := WriterType{
    Opts: opts[i],
    Client: cli,
    }

    Writers.Put(opts[i].Url, writer)
    }

Writers.StartConsumer

StartConsumer 每 400 毫秒从队列中取出一串数据,开启一个协程将这些数据写入到 Prometheus 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (ws *WritersType) StartConsumer(index int, ch *SafeListLimited, clusterName string) {
for {
series := ch.PopBack(ws.globalOpt.QueuePopSize)
if len(series) == 0 {
time.Sleep(time.Millisecond * 400)
continue
}

for key := range ws.backends {
if ws.backends[key].Opts.ClusterName != clusterName {
continue
}
go ws.backends[key].Write(clusterName, index, series)
}
}
}

reportChanSize

reportChanSize 的逻辑很简单,就是每 3 秒钟报告一次队列的当前大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func reportChanSize() {
clusterName := config.C.ClusterName
if clusterName == "" {
return
}

for {
time.Sleep(time.Second * 3)
for cluster, m := range Writers.queues {
for i, c := range m {
size := c.Len()
promstat.GaugeSampleQueueSize.WithLabelValues(cluster, fmt.Sprint(i)).Set(float64(size))
}
}
}
}

PushSample

在向 Prometheus 推送指标时,会对 ident 名称做 CRC32 编码后取模,得到指标被推送的队列。

应用问题

缓存穿透

key 对应的数据在数据源(DB)并不存在,每次针对此 key 的请求从缓存获取不到,那么就会从数据源请求数据,从而可能压垮数据源。

image-20221229145805794

解决方案

  • 对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),仍然把这个空结果进行缓存,设置空结果的过期时间会很短,最长不超过五分钟。
  • 设置可访问的名单(白名单):可以访问的 id 记录在 Redis 缓存中,可以使用 string 类型或者 bitmaps 类型(id 作为 bitmaps 的偏移量)。每次访问时都在缓存中查询 id 是否在其中,若不存在就直接返回空值,不查询数据库。
  • 布隆过滤器(Bloom Filter):是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。

布隆过滤器原理:

布隆过滤器由一个位图和 N 个哈希函数组成。布隆过滤器会在位图上做标记,只有标记全部为 1,才说明可能存在。布隆过滤器标记的流程:

  • 第一步,使用 N 个哈希函数分别对数据做哈希计算,得到 N 个哈希值
  • 第二步,将第一步得到的 N 个哈希值对位图数组的长度取模,在位图数组的对应位置的值设置为 1

查询布隆过滤器说数据存在,并不一定证明数据库中存在这个数据,但是查询到数据不存在,数据库中一定就不存在这个数据

图片

阅读全文 »