Dawn's Blogs

分享技术 记录成长

0%

从零实现分布式缓存 (2) 单机并发缓存

本节实现了对 LRU 缓存的并发控制。以及 DawnCache 的核心数据结构 Group。Group 可用对缓存分组,以实现缓存数据的简单分类,缓存不存在时,调用回调函数获取源数据。

最终代码结构如下:

1
2
3
4
5
6
7
8
lru/
|--lru.go
|--lru_test.go
byteview.go
cache.go
dawncache.go
dawncache_test.go
go.mod

并发读写

ByteView 只读的缓存值

byteview.go

首先定义一个 ByteView,它是只读的,用于表示缓存值

1
2
3
4
// ByteView 保存不可变的字节缓存值
type ByteView struct {
b []byte
}

因为表示缓存值,所以需要实现 lru.Value 接口。定义 Len 方法,用于获取缓存值得占用字节数。

1
2
3
4
// Len 实现 lru.Value 接口
func (v ByteView) Len() int {
return len(v.b)
}

定义 ByteSliceString 方法,分别用于返回一个拷贝和对应的字符串:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ByteSlice 返回一个 ByteView 数据的克隆切片,ByteView 只读,所以返回克隆切片防止外部程序修改
func (v ByteView) ByteSlice() []byte {
return cloneBytes(v.b)
}

// String 返回 ByteView 对应的字符串
func (v ByteView) String() string {
return string(v.b)
}

// cloneBytes 克隆数据
func cloneBytes(b []byte) []byte {
clone := make([]byte, len(b))
copy(clone, b)
return clone
}

为 lru.Cache 添加并发特性

cache.go

为了实现 lru.Cache并发特性,所以需要在外封装一层,加上互斥锁用于并发控制

1
2
3
4
5
6
// cache 单机并发缓存,带有互斥锁
type cache struct {
mu sync.Mutex // 互斥锁
lru *lru.Cache // LRU 缓存
cacheBytes int64 // 缓存容量
}

add 方法用于向缓存中添加键值对。

add 方法中,判断了 c.lru 是否为 nil,如果等于 nil 再创建实例。这种方法称之为延迟初始化(Lazy Initialization),一个对象的延迟初始化意味着该对象的创建将会延迟至第一次使用该对象时。主要用于提高性能,并减少程序内存要求。

1
2
3
4
5
6
7
8
9
10
11
// add 向缓存中添加键值对
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()

if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil)
}

c.lru.Add(key, value)
}

get 方法用于查找缓存值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// get 查找
func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
// 底层lru缓存为空,直接返回
if c.lru == nil {
return
}
// 在底层lru缓存中查找
if v, ok := c.lru.Get(key); ok {
return v.(ByteView), ok
}

return
}

Group 实现

dawncache.go

Group 是 DawnCache 最核心的数据结构,不仅分组将缓存数据进行了简单的划分;而且负责与用户的交互,并且控制缓存值存储和获取的流程。

1
2
3
4
5
6

接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴
| 否 是
|-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵
| 否
|-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 ⑶

接下来实现流程 (1) 和 (3),远程交互部分后续实现。

Getter 回调

设计一个回调函数,当数据不存在时,调用这个函数得到源数据。

  • Getter 是一个接口,其中定义了一个函数 Get 用于获取源数据。
  • GetterFunc 是函数类型,并且实现了 Getter 接口。
  • 接口型函数方便使用者在调用时既能够传入函数作为参数,也能够传入实现了该接口的结构体作为参数。
1
2
3
4
5
6
7
8
9
type Getter interface {
Get(key string) ([]byte, error)
}

type GetterFunc func(key string) ([]byte, error)

func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}

Group 定义

首先定义 Group:

  • name 表示一个 Group 的命名空间,用于区分不同类型的缓存。
  • getter 是缓存未命中获取源数据的回调
  • mainCache 是实现并发的 LRU 缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Group struct {
name string // 一个组的命名空间,用于区分不同的缓存,如学生姓名、成绩可以放到不同的缓存中去
getter Getter // 当查找数据未命中时,调用该函数获取值
mainCache cache // 底层缓存
}

var (
mu sync.RWMutex
groups = make(map[string]*Group) // 存储所有的缓存
)

// NewGroup 新建一个 *Group 缓存
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil getter")
}
mu.Lock()
defer mu.Unlock()
g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
groups[name] = g // 添加到 map 中
return g
}

// GetGroup 根据命名空间返回对应的 Group
func GetGroup(name string) *Group {
if g, ok := groups[name]; ok {
return g
}
return nil
}

Group Get 方法

Group 的 Get 方法用于获取缓存数据:

  • 首先从 LRU 缓存中获取缓存数据。
  • 若 LRU 缓存中没有,则从别处加载数据。本节只实现了通过回调函数获取源数据。
  • 当通过回调函数获取源数据后,将新的数据再插入到 LRU 缓存中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, errors.New("key is required")
}
// 可以在缓存中查询到,返回数据
if v, ok := g.mainCache.get(key); ok {
return v, nil
}
// 从远程或者回调函数获取key对应的value
return g.load(key)
}

// load 从别处加载数据
func (g *Group) load(key string) (ByteView, error) {
// 暂时全部调用回调函数加载key对应的value
// 从远程调用之后实现
return g.getLocally(key)
}

// getLocally 从本地,即调用回调函数获取 value
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: cloneBytes(bytes)}
g.populateCache(key, value) // 将新获取到的数据放入缓存中
return value, nil
}

func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}

测试

dawncache.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package DawnCache

import (
"fmt"
"log"
"testing"
)

var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}

func TestGet(t *testing.T) {
loadCounts := make(map[string]int, len(db))
gee := NewGroup("scores", 2<<10, GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
if _, ok := loadCounts[key]; !ok {
loadCounts[key] = 0
}
loadCounts[key]++
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))

for k, v := range db {
if view, err := gee.Get(k); err != nil || view.String() != v {
t.Fatal("failed to get value of Tom")
}
if _, err := gee.Get(k); err != nil || loadCounts[k] > 1 {
t.Fatalf("cache %s miss", k)
}
}

if view, err := gee.Get("unknown"); err == nil {
t.Fatalf("the value of unknow should be empty, but %s got", view)
}
}

func TestGetGroup(t *testing.T) {
groupName := "scores"
NewGroup(groupName, 2<<10, GetterFunc(
func(key string) (bytes []byte, err error) { return }))
if group := GetGroup(groupName); group == nil || group.name != groupName {
t.Fatalf("group %s not exist", groupName)
}

if group := GetGroup(groupName + "111"); group != nil {
t.Fatalf("expect nil, but %s got", group.name)
}
}