Dawn's Blogs

分享技术 记录成长

0%

本节实现 protobuf 进行节点间的通信。最终代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
dawncachepb/
|--dawncachepb.pb.go
|--dawncachepb.proto
lru/
|--lru.go
|--lru_test.go
singleflight/
|--singleflight.go
byteview.go
cache.go
consistenthash.go
dawncache.go
dawncache_test.go
go.mod
http.go
peers.go

使用 protobuf 通信

编写 proto 文件

dawncachepb/dawncachepb.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
syntax = "proto3";

package dawncachepb;

option go_package = "./;dawncachepb";

message Request {
string group = 1;
string key = 2;
}

message Response {
bytes value = 1;
}

service GroupCache {
rpc Get(Request) returns (Response);
}

生成 pb.go 代码

生成 dawncachepb.pb.go

1
protoc --go_out=. *.proto

dawncachepb/dawncachepb.pb.go 有如下数据类型:

1
2
3
4
5
6
7
8
9
10
type Request struct {
// ...
Group string `protobuf:"bytes,1,opt,name=group,proto3" json:"group,omitempty"`
Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
}

type Response struct {
// ...
Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
}

修改 PeerGetter 接口

修改 peers.go 中的 PeerGetter 接口,参数使用 geecachepb.pb.go 中的数据类型。

1
2
3
4
5
// PeerGetter 远程获取数据的接口
type PeerGetter interface {
// Get 根据 groupName 和 key 获取源数据
Get(in *pb.Request, out *pb.Response) error
}

修改使用 PeerGetter 接口的代码

dawncache.go

1
2
3
4
5
6
7
8
9
10
11
12
13
// getFromPeer 从 peer 处获取数据
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
req := &pb.Request{
Group: g.name,
Key: key,
}
res := &pb.Response{}
err := peer.Get(req, res)
if err != nil {
return ByteView{}, err
}
return ByteView{b: res.Value}, nil
}

http.go

  • ServeHTTP() 中使用 proto.Marshal() 编码 HTTP 响应。
  • Get() 中使用 proto.Unmarshal() 解码 HTTP 响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// ServeHTTP 处理查询缓存的请求,实现了 http.Handler 接口
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 判断是否有 basePath
if !strings.HasPrefix(r.URL.Path, p.basePath) {
http.Error(w, "HTTPPool serving unexpected path: "+r.URL.Path, http.StatusBadRequest)
return
}

// 检查是否有 groupName 和 key
parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}

groupName := parts[0]
key := parts[1]

// 通过 groupName 获取 group
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group:"+groupName, http.StatusBadRequest)
return
}

// 从缓存中获取数据
view, err := group.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// 响应客户端
body, err := proto.Marshal(&pb.Response{Value: view.ByteSlice()}) // 编码
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(body)
}

// Get 实现了 PeerGetter 接口,用于远程获取源数据
func (h *HTTPGetter) Get(in *pb.Request, out *pb.Response) error {
url := fmt.Sprintf("%s%s/%s", h.basePath, url.QueryEscape(in.GetGroup()), url.QueryEscape(in.GetKey()))

res, err := http.Get(url)
if err != nil {
// 发送请求失败
return err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
// 状态码不是 200
return fmt.Errorf("server status code: %v", res.StatusCode)
}

data, err := ioutil.ReadAll(res.Body)
if err != nil {
// 读取数据失败
return errors.New("read response body failed")
}

if err = proto.Unmarshal(data, out); err != nil {
return fmt.Errorf("decoding response body: %v", err)
}

return nil
}

本节实现实现了防止缓存击穿的措施,通过多个并发请求映射为一个请求来实现。最终代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
lru/
|--lru.go
|--lru_test.go
singleflight/
|--singleflight.go
byteview.go
cache.go
consistenthash.go
dawncache.go
dawncache_test.go
go.mod
http.go
peers.go

概念

  • 缓存雪崩:缓存在同一时刻全部失效,造成瞬时 DB 请求量过大、压力骤增。缓存雪崩通常因为缓存服务器宕机、缓存的 key 设置了相同的过期时间等引起。
  • 缓存击穿:一个存在的 key,在缓存过期的一瞬间,同时有大量的请求,这些请求都会击穿到 DB ,造成瞬时DB请求量大、压力骤增。
  • 缓存穿透:查询一个不存在的数据,因为不存在则不会写到缓存中,所以每次都会去请求 DB。

singleflight 实现

singleflight/singleflight.go

dawncache 通过 singleflight 来实现防止缓存击穿。

定义结构体

首先定义 call,代表一次请求:

1
2
3
4
5
6
// call 代表一次查询请求
type call struct {
wg sync.WaitGroup
val interface{}
err error
}

Group 记录了待查询的 key 和一次请求之间的映射关系。

当 key 还在 hashMap 中时,视为一次请求即可。

1
2
3
4
type Group struct {
mu sync.Mutex // 对 hashMap 的访问互斥
hashMap map[string]*call // 保存 key 和请求的映射关系
}

实现 Do 方法

Do 方法实现了多次相同的查询,到一次请求的映射操作:

  • 查询 hashMap 中是否已经记录了 key 对应的 call 操作,如果有,则等待这一次请求得到数据并返回结果。
  • 如果不在 hashMap 中,则新建一个查询请求 call 并记录在 hashMap 中,待执行过查询操作之后再返回数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.hashMap == nil { // 延迟初始化
g.hashMap = make(map[string]*call)
}
if c, ok := g.hashMap[key]; ok {
// 已在 hashMap 中记录,等待结果即可
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}

// 没有在 hashMap 中记录
// 新建 call 在 hashMap 中记录
c := new(call)
c.wg.Add(1)
g.hashMap[key] = c
g.mu.Unlock()

// 远程请求数据
c.val, c.err = fn()
c.wg.Done() // 得到数据

g.mu.Lock()
delete(g.hashMap, key)
g.mu.Unlock()

return c.val, c.err
}

修改主流程

dawncache.go

需要修改 Group 结构体,使之能够防止缓存穿透:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Group struct {
// ...
loader *singleflight.Group
}

// NewGroup 新建一个 *Group 缓存
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
// ...
g := &Group{
// ...
loader: new(singleflight.Group),
}
// ...
}

修改 load 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// load 从别处加载数据
func (g *Group) load(key string) (ByteView, error) {
view, err := g.loader.Do(key, func() (interface{}, error) {
if g.peers != nil {
// peers 不为空,可以从远程获取数据
if peer, ok := g.peers.PickPeer(key); ok {
// 从远程获取数据
view, err := g.getFromPeer(peer, key)
if err != nil {
log.Println("[GeeCache] Failed to get from peer", err)
return ByteView{}, err
}
return view, nil
}
}
// 本地通过回调函数获取数据
return g.getLocally(key)
})
if err != nil {
return ByteView{}, err
}
return view.(ByteView), nil
}

本节实现一致性哈希选择节点,并且实现了 HTTP 客户端。最终代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
lru/
|--lru.go
|--lru_test.go
byteview.go
cache.go
consistenthash.go
dawncache.go
dawncache_test.go
go.mod
http.go
peers.go

PeerPicker

分布式缓存获取数据的流程如下:

1
2
3
4
5
6

接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴
| 否 是
|-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵
| 否
|-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 ⑶

之前已经实现了 (1) 和 (3) 现在需要实现流程 (2) 从远程获取数据:

1
2
3
4
使用一致性哈希选择节点        是                                    是
|-----> 是否是远程节点 -----> HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值
| 否 ↓ 否
|----------------------------> 回退到本地节点处理。

抽象 PeerPicker

peers.go

  • 现在需要定义一个 PeerPicker 接口,用于选择与哪一个节点进行通信。

  • PeerGetter 接口用于具体的远程获取数据的操作。

1
2
3
4
5
6
7
8
9
10
11
// PeerPicker 选取节点的接口
type PeerPicker interface {
// PickPeer 根据 key 选择相应的 PeerGetter 获取数据
PickPeer(key string) (peer PeerGetter, ok bool)
}

// PeerGetter 远程获取数据的接口
type PeerGetter interface {
// Get 根据 groupName 和 key 获取源数据
Get(groupName string, key string) ([]byte, error)
}

节点选择与 HTTP 客户端

http.go

实现 PeerGetter

首先定义一个 HTTPGetter 结构体,用于实现 PeerGetter 接口:

1
2
3
4
// HTTPGetter 通过 HTTP 远程获取数据
type HTTPGetter struct {
basePath string
}

实现 Get 方法,用于远程获取数据,它通过发送 HTTP GET 请求来从远程获取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Get 实现了 PeerGetter 接口,用于远程获取源数据
func (h *HTTPGetter) Get(groupName string, key string) ([]byte, error) {
url := fmt.Sprintf("%s%s/%s", h.basePath, groupName, key)

res, err := http.Get(url)
if err != nil {
// 发送请求失败
return nil, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
// 状态码不是 200
return nil, fmt.Errorf("server status code: %v", res.StatusCode)
}

data, err := ioutil.ReadAll(res.Body)
if err != nil {
// 读取数据失败
return nil, errors.New("read response body failed")
}

return data, nil
}

var _ PeerGetter = (*HTTPGetter)(nil) // 检查实现 PeerGetter 接口

实现 PeerPicker

修改 HTTPPool 结构体,使之记录一致性哈希的结构体以及 HTTPGetter

1
2
3
4
5
6
7
type HTTPPool struct {
self string // 如 http://127.0.0.1:8080
basePath string // 节点间通讯地址的前缀,如 http:// 127.0.1:8080/basePath/groupName/key 用于请求数据
mu sync.Mutex
peers *Map // 一致性哈希,根据 key 来选择节点
httpGetters map[string]*HTTPGetter // 根据 baseURL 选择 HTTPGetter
}

Set 方法用于添加节点,即在一致性哈希的哈希环中添加节点:

1
2
3
4
5
6
7
8
9
10
11
12
// Set 添加节点
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = New(defaultReplicas, nil)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*HTTPGetter)

for _, peer := range peers {
p.httpGetters[peer] = &HTTPGetter{basePath: peer + p.basePath}
}
}

PickPeer 方法用于根据 key 选择节点,实现 PeerPicker 接口:

1
2
3
4
5
6
7
8
9
10
// PickPeer 实现 PeerPicker 接口,用于根据 key 选择节点
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if peer := p.peers.Get(key); peer != "" && peer != p.self {
p.Log("Pick peer %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}

实现主流程

dawncache.go

修改 Group 结构体,添加 PeerPicker

1
2
3
4
5
6
type Group struct {
name string // 一个组的命名空间,用于区分不同的缓存,如学生姓名、成绩可以放到不同的缓存中去
getter Getter // 当查找数据未命中时,调用该函数获取值
mainCache cache // 底层缓存
peers PeerPicker
}

实现一个方法用于注册 PeerPicker

1
2
3
4
5
6
7
8
// RegisterPeers 注册 PeerPicker
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
// RegisterPeers 不允许调用超过1次
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}

修改 load 方法,使之:

  • 既能够从远程获取数据。
  • 又能够在本地通过回调函数获取数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// load 从别处加载数据
func (g *Group) load(key string) (ByteView, error) {
if g.peers != nil {
// peers 不为空,可以从远程获取数据
if peer, ok := g.peers.PickPeer(key); ok {
// 从远程获取数据
view, err := g.getFromPeer(peer, key)
if err != nil {
log.Println("[GeeCache] Failed to get from peer", err)
return ByteView{}, err
}
return view, nil
}
}
// 本地通过回调函数获取数据
return g.getLocally(key)
}

// getFromPeer 从 peer 处获取数据
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
data, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{b: data}, nil
}

本节实现了一致性哈希,最终代码结构如下:

1
2
3
4
5
6
7
8
9
10
lru/
|--lru.go
|--lru_test.go
byteview.go
cache.go
consistenthash.go
dawncache.go
dawncache_test.go
go.mod
http.go

一致性哈希

为什么使用一致性哈希

使用一致性哈希的原因如下:

  • 对于分布式缓存来说,若一个节点收到请求但是该节点没有对应的缓存数据。那么应该从谁那里获取数据?假设一共有 10 个节点,随机获取数据。假设第一次从节点 1 处获取数据,那么第二次只有十分之一的概率再次从节点 1 处获取数据。因为每一次可能都会从一个新的节点获取数据源,这样的操作不仅耗费时间(新的节点可能本身没有数据,需要从其他节点或者调用回调获取数据源),而且浪费空间(数据会冗余存储)。
  • 当某一个节点失效(节点增加也是同理)了,若使用简单的取余操作来选择节点,比如之前的 hash(key)%10 变成了 hash(key)%10,会使几乎所有缓存对应的节点都发生了改变。即所有缓存值都失效了,造成缓存雪崩:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。

原理

一致性哈希算法将 key 映射到 2^32 的空间中,将这个数字首尾相连,形成一个环。

  • 计算机节点的哈希,放置在哈希环上。
  • 计算 key 的哈希值,放置在环上顺时针寻找到的第一个节点就是对应的计算机节点。

下图左图中,key27、key11、key2 对应于节点 peer2;key23 对应于节点 peer4。

下图右图中,新增节点 peer8,但是只有 key27 对应的节点从 peer2 变成了 peer8,其余均不变。

一致性哈希添加节点 consistent hashing add peer

数据倾斜

当节点数目较少时,容易出现数据倾斜问题。就是 key 的映射关系不均匀,可能很多的 key 映射到了 peer1,只有很少甚至没有 key 映射到 peer2。

为了解决这个问题,引入虚拟节点的概念,一个真实节点对应多个虚拟节点。

虚拟节点与真实节点一样,将哈希值防止在哈希环上。计算 key 的哈希值,放置在环上顺时针寻找到的第一个(虚拟)节点就是对应的节点。

实现一致性哈希

consistenthash.go

一致性哈希主体结构

首先定义一致性哈希的主体结构体 Map

  • hash 表示使用的哈希函数。
  • replicas 表示一个节点加上其虚拟节点的数量。
  • keys 表示哈希环,有序存储所有的节点。
  • hashMap 存储所有节点与真实节点映射关系。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Hash func(data []byte) uint32

type Map struct {
hash Hash // 使用的 hash 函数
replicas int // 一个真实节点所对应虚拟节点的数量
keys []int // 哈希环,存储所有的节点,有序的
hashMap map[int]string // 存储所有节点与真实节点的映射关系
}

func New(replicas int, hash Hash) *Map {
m := &Map{
hash: hash,
replicas: replicas,
hashMap: make(map[int]string),
}
if hash == nil {
// 默认哈希函数
m.hash = crc32.ChecksumIEEE
}

return m
}

添加真实机器

添加真实机器的方法 Add,一次允许添加多个真实机器。每一个机器对应 m.replicas 个虚拟节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ { // 一个真实节点对应 m.replicas 个虚拟节点
// 计算哈希 = key+编号
hash := int(m.hash([]byte(key + strconv.Itoa(i))))
// 加入到哈希环中
m.keys = append(m.keys, hash)
// 存储映射
m.hashMap[hash] = key
}
}
// 将 keys 排序
sort.Ints(m.keys)
}

获取 key 对应的机器

获取 key 对应的机器 Get 函数:

  • 首先计算 key 的哈希。
  • 再顺时针找到最近的虚拟节点。
  • 查询虚拟节点对应的真实机器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (m *Map) Get(key string) string {
if len(key) == 0 {
return ""
}

// 计算哈希
hash := int(m.hash([]byte(key)))
// 在哈希环上查找节点
idx := sort.Search(len(m.keys), func(i int) bool {
return m.keys[i] >= hash
})
// 返回节点
return m.hashMap[m.keys[idx%len(m.keys)]]
}

本节实现了分布式缓存 HTTP 服务端的搭建,最终代码结构如下:

1
2
3
4
5
6
7
8
9
lru/
|--lru.go
|--lru_test.go
byteview.go
cache.go
dawncache.go
dawncache_test.go
go.mod
http.go

HTTP 服务端

http.go

HTTPPool

构建结构体 HTTPPool 用于作为服务端,用于响应查询缓存数据的请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
const defaultBasePath = "/_dawncache/"

type HTTPPool struct {
self string // 如 http://127.0.0.1:8080
basePath string // 节点间通讯地址的前缀,如 http:// 127.0.1:8080/basePath/groupName/key 用于请求数据
}

func NewHTTPPool(self string) *HTTPPool {
return &HTTPPool{
self: self,
basePath: defaultBasePath,
}
}

实现 http.Handler 接口

当 HTTPPool 实现了 http.Handler 接口时,可以传入 http.ListenAndServe 函数作为第二个参数。

而 http.Handler 的接口定义如下:

1
2
3
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}

所以 HTTPPool 需要实现 ServeHTTP 方法,以此实现 http.Handler 接口。ServeHTTP 中的主要逻辑就是从 URL.Path 中提取出 groupName 和 key,并在相应的 group 中查询缓存数据并响应 HTTP 请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Log 输出日志信息
func (p *HTTPPool) Log(format string, v ...interface{}) {
log.Printf("[server %s] %s", p.self, fmt.Sprintf(format, v...))
}

// ServeHTTP 处理查询缓存的请求,实现了 http.Handler 接口
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 判断是否有 basePath
if !strings.HasPrefix(r.URL.Path, p.basePath) {
http.Error(w, "HTTPPool serving unexpected path: "+r.URL.Path, http.StatusBadRequest)
return
}

// 检查是否有 groupName 和 key
parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}

groupName := parts[0]
key := parts[1]

// 通过 groupName 获取 group
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group:"+groupName, http.StatusBadRequest)
return
}

// 从缓存中获取数据
view, err := group.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// 响应客户端
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(view.ByteSlice())
}

本节实现了对 LRU 缓存的并发控制。以及 DawnCache 的核心数据结构 Group。Group 可用对缓存分组,以实现缓存数据的简单分类,缓存不存在时,调用回调函数获取源数据。

最终代码结构如下:

1
2
3
4
5
6
7
8
lru/
|--lru.go
|--lru_test.go
byteview.go
cache.go
dawncache.go
dawncache_test.go
go.mod

并发读写

ByteView 只读的缓存值

byteview.go

首先定义一个 ByteView,它是只读的,用于表示缓存值

1
2
3
4
// ByteView 保存不可变的字节缓存值
type ByteView struct {
b []byte
}

因为表示缓存值,所以需要实现 lru.Value 接口。定义 Len 方法,用于获取缓存值得占用字节数。

1
2
3
4
// Len 实现 lru.Value 接口
func (v ByteView) Len() int {
return len(v.b)
}

定义 ByteSliceString 方法,分别用于返回一个拷贝和对应的字符串:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ByteSlice 返回一个 ByteView 数据的克隆切片,ByteView 只读,所以返回克隆切片防止外部程序修改
func (v ByteView) ByteSlice() []byte {
return cloneBytes(v.b)
}

// String 返回 ByteView 对应的字符串
func (v ByteView) String() string {
return string(v.b)
}

// cloneBytes 克隆数据
func cloneBytes(b []byte) []byte {
clone := make([]byte, len(b))
copy(clone, b)
return clone
}

为 lru.Cache 添加并发特性

cache.go

为了实现 lru.Cache并发特性,所以需要在外封装一层,加上互斥锁用于并发控制

1
2
3
4
5
6
// cache 单机并发缓存,带有互斥锁
type cache struct {
mu sync.Mutex // 互斥锁
lru *lru.Cache // LRU 缓存
cacheBytes int64 // 缓存容量
}

add 方法用于向缓存中添加键值对。

add 方法中,判断了 c.lru 是否为 nil,如果等于 nil 再创建实例。这种方法称之为延迟初始化(Lazy Initialization),一个对象的延迟初始化意味着该对象的创建将会延迟至第一次使用该对象时。主要用于提高性能,并减少程序内存要求。

1
2
3
4
5
6
7
8
9
10
11
// add 向缓存中添加键值对
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()

if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil)
}

c.lru.Add(key, value)
}

get 方法用于查找缓存值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// get 查找
func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
// 底层lru缓存为空,直接返回
if c.lru == nil {
return
}
// 在底层lru缓存中查找
if v, ok := c.lru.Get(key); ok {
return v.(ByteView), ok
}

return
}

Group 实现

dawncache.go

Group 是 DawnCache 最核心的数据结构,不仅分组将缓存数据进行了简单的划分;而且负责与用户的交互,并且控制缓存值存储和获取的流程。

1
2
3
4
5
6

接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴
| 否 是
|-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵
| 否
|-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 ⑶

接下来实现流程 (1) 和 (3),远程交互部分后续实现。

Getter 回调

设计一个回调函数,当数据不存在时,调用这个函数得到源数据。

  • Getter 是一个接口,其中定义了一个函数 Get 用于获取源数据。
  • GetterFunc 是函数类型,并且实现了 Getter 接口。
  • 接口型函数方便使用者在调用时既能够传入函数作为参数,也能够传入实现了该接口的结构体作为参数。
1
2
3
4
5
6
7
8
9
type Getter interface {
Get(key string) ([]byte, error)
}

type GetterFunc func(key string) ([]byte, error)

func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}

Group 定义

首先定义 Group:

  • name 表示一个 Group 的命名空间,用于区分不同类型的缓存。
  • getter 是缓存未命中获取源数据的回调
  • mainCache 是实现并发的 LRU 缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Group struct {
name string // 一个组的命名空间,用于区分不同的缓存,如学生姓名、成绩可以放到不同的缓存中去
getter Getter // 当查找数据未命中时,调用该函数获取值
mainCache cache // 底层缓存
}

var (
mu sync.RWMutex
groups = make(map[string]*Group) // 存储所有的缓存
)

// NewGroup 新建一个 *Group 缓存
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil getter")
}
mu.Lock()
defer mu.Unlock()
g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
groups[name] = g // 添加到 map 中
return g
}

// GetGroup 根据命名空间返回对应的 Group
func GetGroup(name string) *Group {
if g, ok := groups[name]; ok {
return g
}
return nil
}

Group Get 方法

Group 的 Get 方法用于获取缓存数据:

  • 首先从 LRU 缓存中获取缓存数据。
  • 若 LRU 缓存中没有,则从别处加载数据。本节只实现了通过回调函数获取源数据。
  • 当通过回调函数获取源数据后,将新的数据再插入到 LRU 缓存中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, errors.New("key is required")
}
// 可以在缓存中查询到,返回数据
if v, ok := g.mainCache.get(key); ok {
return v, nil
}
// 从远程或者回调函数获取key对应的value
return g.load(key)
}

// load 从别处加载数据
func (g *Group) load(key string) (ByteView, error) {
// 暂时全部调用回调函数加载key对应的value
// 从远程调用之后实现
return g.getLocally(key)
}

// getLocally 从本地,即调用回调函数获取 value
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: cloneBytes(bytes)}
g.populateCache(key, value) // 将新获取到的数据放入缓存中
return value, nil
}

func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}

测试

dawncache.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package DawnCache

import (
"fmt"
"log"
"testing"
)

var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}

func TestGet(t *testing.T) {
loadCounts := make(map[string]int, len(db))
gee := NewGroup("scores", 2<<10, GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
if _, ok := loadCounts[key]; !ok {
loadCounts[key] = 0
}
loadCounts[key]++
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))

for k, v := range db {
if view, err := gee.Get(k); err != nil || view.String() != v {
t.Fatal("failed to get value of Tom")
}
if _, err := gee.Get(k); err != nil || loadCounts[k] > 1 {
t.Fatalf("cache %s miss", k)
}
}

if view, err := gee.Get("unknown"); err == nil {
t.Fatalf("the value of unknow should be empty, but %s got", view)
}
}

func TestGetGroup(t *testing.T) {
groupName := "scores"
NewGroup(groupName, 2<<10, GetterFunc(
func(key string) (bytes []byte, err error) { return }))
if group := GetGroup(groupName); group == nil || group.name != groupName {
t.Fatalf("group %s not exist", groupName)
}

if group := GetGroup(groupName + "111"); group != nil {
t.Fatalf("expect nil, but %s got", group.name)
}
}

本系列参考于 极客兔兔-7天用Go从零实现分布式缓存GeeCache,将从零开始实现一个分布式缓存,称为 DawnCache

在本节实现了 LRU 缓存删除策略,最终的代码目录结构如下:

1
2
3
4
lru/
|--lru.go
|--lru_test.go
go.mod

缓存淘汰策略

当有新数据加入缓存,缓存的容量已经超过了最大容量,就需要将之前加入的缓存数据替换出去。常见的缓存淘汰策略如下:

  • FIFO(First In First Out)先进先出:淘汰最先进入缓存的数据,这种策略局部性不好。
  • LFU(Least Frequently Used)最久未使用:LFU 需要为每一条缓存记录维护一个计数器,每一次命中会使得计数器+1,每次淘汰访问次数最少的即可。但是 LRU 有一个缺点,就是若某条缓存在过去被频繁访问,即使最近不再被访问也不会被立即淘汰。
  • LRU(Least Recently Used)最近最少未使用:LRU 认为,如果数据最近被访问过,那么将来被访问的概率也会更高。LRU 维护一个队列,如果某条记录被访问了,则移动到队头,那么队尾则是最近最少访问的数据,淘汰该条记录即可。

LRU 数据结构

LRU 的数据结构如下:

  • 双向链表的顺序表示最近被访问的顺序,队头是最近被访问的数据,队尾是最久未被访问的数据。
  • 字典记录了键值 key 与链表节点的映射关系,这样保证了查找和增删的时间复杂度不高。

implement lru algorithm with golang

LRU

lru/lru.go

实现 LRU

首先定义 LRU 缓存的结构体、链表节点值的结构体、value 的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Cache LRU 缓存
type Cache struct {
maxBytes int64 // 缓存的最大容量, 0 表示缓存容量不受限制
nBytes int64 // 当前缓存已占用的空间
ll *list.List // 用于 LRU 的双向链表
cache map[string]*list.Element // 用于保存 key 与双向链表节点地址之间的映射关系
OnEvicted func(key string, value Value) // 移除数据时执行
}

// entry 保存在双向链表中的条目
type entry struct {
key string
value Value
}

// Value 缓存中value的类型,只要能求出占用空间即可
type Value interface {
Len() int
}

func New(maxBytes int64, onEvicted func(key string, value Value)) *Cache {
return &Cache{
maxBytes: maxBytes,
ll: list.New(),
cache: make(map[string]*list.Element),
OnEvicted: onEvicted,
}
}

下面定义 Cache 的查找操作,当查找命中时将对应节点移动至双向链表的头部

1
2
3
4
5
6
7
8
9
10
11
12
// Get 获取键值key对应的value
func (c *Cache) Get(key string) (value Value, ok bool) {
if elem, ok := c.cache[key]; ok {
// lru 命中
value = elem.Value.(*entry).value
// 将节点移动到链头
c.ll.MoveToFront(elem)
return value, ok
}
// 未命中
return nil, false
}

Cache 的删除操作,即删除最近最久未使用的节点(双向链表的尾部节点):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// DeleteOldest 删除最近最久未使用的节点 即链表尾部的节点
func (c *Cache) DeleteOldest() {
elem := c.ll.Back() // 得到尾部节点
if elem != nil {
kv := elem.Value.(*entry)
// 删除尾部节点
c.ll.Remove(elem)
// 从cache中删除
delete(c.cache, kv.key)
// 减少占用空间
c.nBytes -= int64(len(kv.key)) + int64(kv.value.Len())
if c.OnEvicted != nil {
// 删除数据时执行
c.OnEvicted(kv.key, kv.value)
}
}
}

Cache 的增加数据操作:

  • 如果缓存里已有键值为key的数据,则更新value
  • 没有键值为key的数据,新增一个节点,插入链表头部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Add 在 lru 缓存中添加数据
func (c *Cache) Add(key string, value Value) {
if elem, ok := c.cache[key]; ok {
// 如果缓存里已有键值为key的数据,则更新value
c.ll.MoveToFront(elem)
kv := elem.Value.(*entry)
c.nBytes += int64(value.Len()) - int64(kv.value.Len())
kv.value = value

} else {
// 没有键值为key的数据,新增一个节点,插入链表头部
elem := c.ll.PushFront(&entry{key: key, value: value})
c.nBytes += int64(len(key)) + int64(value.Len())
c.cache[key] = elem
}
// 超出最大空间,删除最久未使用节点
for c.maxBytes != 0 && c.nBytes > c.maxBytes {
c.DeleteOldest()
}
}

测试

lru/lru_test.go

下面是对 LRU 缓存的测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package lru

import (
"reflect"
"testing"
)

type String string

// Len 实现 lru.Value 接口
func (s String) Len() int {
return len(s)
}

func TestGet(t *testing.T) {
lru := New(int64(0), nil)
lru.Add("key1", String("dawn"))
if v, ok := lru.Get("key1"); !ok || string(v.(String)) != "dawn" {
t.Fatalf("cache hit key1=dawn failed")
}
if _, ok := lru.Get("key2"); ok {
t.Fatalf("cache miss key2 failed")
}
}

func TestDeleteOldest(t *testing.T) {
k1, k2, k3 := "key1", "key2", "k3"
v1, v2, v3 := "value1", "value2", "v3"
maxBytes := int64(len(k1 + k2 + v1 + v2))
lru := New(maxBytes, nil)
lru.Add(k1, String(v1))
lru.Add(k2, String(v2))
lru.Add(k3, String(v3))

if _, ok := lru.Get(k1); ok {
t.Fatalf("Deleteoldest key1 failed")
}
}

func TestOnEvicted(t *testing.T) {
keys := make([]string, 0)
callback := func(key string, value Value) {
keys = append(keys, key)
}
lru := New(int64(10), callback)
lru.Add("key1", String("123456"))
lru.Add("k2", String("k2"))
lru.Add("k3", String("k3"))
lru.Add("k4", String("k4"))

expect := []string{"key1", "k2"}

if !reflect.DeepEqual(expect, keys) {
t.Fatalf("Call OnEvicted failed, expect keys equals to %s", expect)
}
}

语音识别介绍

语言识别包括输入(一段语音,长度为 T,d 维向量)、语音识别模型、输出(一段文字,长度为 N,有 V 个不同的 token):

1653558659336

token

下面介绍不同种的 token:

  • Phoneme:最小的语音单元,比如音标、拼音。缺点是需要一个词典,来对应单词与 phonemes 的关系。

  • Grapheme:最小的书写单元,比如 26 个英文字母 + 标点 + 空格、汉字 + 标点。

  • Word:单词,对于一些语言而言单词太多了(V 很长)。

  • Morpheme:最小的有意义的单元,如英文中的前后缀以及中心词。

  • Bytes:字节,如 UTF-8 中的每一个字节。好处是可以不依赖于具体的语言,不同语言的文字都可以用 UTF-8 表示。

输入

语言识别的输入是长度为 T,维度为 d 的向量。如何将语言信号转换为向量,一般取 25ms 为一个 窗口,窗口每次向后移动 10ms,每一次窗口内的语音信号对应一列向量。

关于如何将一个窗口内的语音信号转换为向量,一般有如下方法:

  • 400 次采样:每一个窗口采样 400 次,将 400 次采样结果直接进行拼接,形成一列向量。
  • MFCC:形成 39 维的向量。
  • Filter Bank Output:形成 80 维的向量(最为常用)。

1653559523976


下面说明以下几种模型:

  • Listen,Attend and Spell(LAS)
  • Connectionist Temporal Classification(CTC)
  • RNN Transducer(RNN-T)
  • Neural Transducer
  • Monotonic Chunkwise Attention(MoChA)

LAS

Listen,Attend and Spell

Listen

Listen的输入为语音特征(acoustic features),经过 Encoder输出一组高级表示法的向量:

  • 用于提取语音内容信息
  • 降低方差和噪声

1653643788556

Encoder

其中,Encoder 可以是:

  • RNN
  • CNN(RNN+CNN 的组合更为常见)
  • Self-Attention Layers

Down Sampling

输入太长导致运算量很大,需要 Down Sampling

  • Pyramid RNN

1653644644946

  • Pooling over time

1653644660175

  • Time-delay DNN(TDNN)或者 Dilated CNN:只考虑第一个和最后一个向量。

1653644976903

  • Truncated Self-Attention:在一定范围内 Attention,超出范围不予考虑。

1653645003003

Attention

Attention 首先会进入 match 函数,match 以 Encoder 的输出 h 向量 和 z 作为输入,输出为向量 α

1653645351315

接着 α 进入 softmax 层,最后得到 c(Attend 的输出),作为 Spell(也就是 RNN)的输入。

1653645422970

match

match 函数是可以替换的,如:

  • Dot-product Attention

1653645510853

  • Additive Attention

1653645522104

Spell

c0 作为 Spell 的输入,得到各个 token 的概率,取最大的作为输出:

1653645919177

z1 作为 match 的输入,得到 c1,c1 再作为 Spell 的输入;以此类推,得到语音识别的 token 序列:

1653646113105

最后选择 token 时,都是选择概率最大的,也就是贪心的选择当前概率最大的 token。但是贪心的选择,有可能进入局部最优,而不是全局最优。

Beam Search 要求每次选择 B(Beam Size)个概率最大的token,显然这会增加运算量,但是更容易得到全局最优。


LAS 在训练时需要强制学习(忽略学习出的结果,而是使用正确结果)。

LAS 无法在线输出语音识别结果,必须一句话说完之后再输出结果。


CTC

Connectionist Temporal Classification,CTC

结构

CTC 结构比较简单,只有一个 Encoder(如果使用 RNN,必须是单向 RNN,因为 CTC 可以在线输出语音识别),Decoder 部分是一个线性分类(softmax)。

1653732739737

CTC (在没有 down sampling 的情况下)有 T 个语音特征输入,就会有 T 个输出。

但是多个语音特征可能对应一个 token,所以输出包括了 Ø,表示什么也不输出。最终合并相邻并且相同的 token,忽略 Ø。

训练

在训练时,比如一段语音有四个输入,标签是 你好。因为引入了 Ø,所以对应了以下输出:

你你好好、Ø你Ø好、你你Ø好 ……

那么到底需要选择哪一个作为训练的正确标签呢?CTC 在训练时,选择将这些全部考虑(穷举)在内。

RNN-T

RNN Transducer,RNN-T

RNA – CTC 到 RNN-T 之间的过渡

RNA,Recurrent Neural Aligner,就是将 CTC 中线性选择器的部分替换为 LSTM(RNN),使当前输出考虑前面的输出。

1653733593764

RNN-T 结构

对于 RNA 而言,有可能一个输入对应多个 token,如 th。此时需要引入 RNN-T。

RNN-T 会重复输入,直到输出了 Ø(表示可以输入下一个语音特征)。

1653734265058

特别之处

因为输出依然包括了 Ø,所以需要自定义标签的生成,依然可以选择与 CTC 一样的处理方式,即选择所有可能的标签。

但是,RNN-T 有自己的特别之处,在输出 token 后,将 token 送入另一个 RNN 中,并且这个 RNN 会忽略 Ø 的输出

这样做的好处是,这个 RNN 网络只看前面产生的非空 token,至于中见有没有 Ø、Ø的顺序,并不关心这个问题。

1653734553646

Neural Transducer

结构

Neural Tansducer 与之前 RNN-T 的不同之处在于:一次性读入多个声音特征作为一个 chunk,先做 Attention

1653734963426

MoChA

Monotonic Chunkwise Attention,MoChA

结构

MoChA 可以自己决定 chunk 的大小

1653735252849

1653735282867

总结

1653735323174

在本节,最终的代码目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
dain/
|--context.go
|--dain.go
|--logger.go
|--router.go
|--recovery.go
|--trie.go
|--go.mod
static/
|--css/
|--index.css
templates/
|--index.tmpl
main.go
go.mod

实现目标

在Web服务器中,因为服务器程序出现 panic 而导致服务端崩溃是无法接受的,所以需要错误恢复机制。

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
// 默认使用 Logger 和 Recovery 中间件
e := dain.Default()

// 加载静态文件
e.Static("/static", "./static")

// 加载模板
e.LoadHTMLGlob("templates/*")

e.Get("/index", func(c *dain.Context) {
c.HTML(http.StatusOK, "index.tmpl", c.Path)
})

// 测试 Recovery 中间件
e.Get("/panic", func(c *dain.Context) {
array := []int{1, 2, 3}
c.JSON(http.StatusOK, dain.H{
"msg": array[100],
})
})

e.Run(":9000")
}

错误恢复

dain/recovery.go

需要预定义一个错误恢复的中间件,用于 recover 错误,以至于 panic 不会导致程序崩溃:

1
2
3
4
5
6
7
8
9
10
11
12
func Recovery() HandlerFunc {
return func(c *Context) {
defer func() {
if err := recover(); err != nil {
message := fmt.Sprintf("%s", err)
log.Printf("%s\n\n", trace(message))
c.Fail(http.StatusInternalServerError, "Internal Server Error")
}
}()
c.Next()
}
}

其中,trace 函数用于追踪出错的位置:

1
2
3
4
5
6
7
8
9
10
11
12
13
func trace(message string) string {
var pcs [32]uintptr
n := runtime.Callers(3, pcs[:]) // skip first 3 caller

var str strings.Builder
str.WriteString(message + "\nTraceback:")
for _, pc := range pcs[:n] {
fn := runtime.FuncForPC(pc)
file, line := fn.FileLine(pc)
str.WriteString(fmt.Sprintf("\n\t%s:%d", file, line))
}
return str.String()
}

在本节,最终的代码目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
dain/
|--context.go
|--dain.go
|--logger.go
|--router.go
|--trie.go
|--go.mod
static/
|--css/
|--index.css
templates/
|--index.tmpl
main.go
go.mod

实现目标

实现加载静态文件以及模板渲染功能:

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"DawnGin/dain"
"net/http"
)

func main() {
e := dain.New()

// 使用 Logger 中间件
e.Use(dain.Logger())

// 加载静态文件
e.Static("/static", "./static")

// 加载模板
e.LoadHTMLGlob("templates/*")

e.Get("/index", func(c *dain.Context) {
c.HTML(http.StatusOK, "index.tmpl", c.Path)
})

e.Run(":9000")
}

templates/index.tmpl

1
2
3
4
<html>
<link rel="stylesheet" href="/static/css/index.css">
<p>index.css is loaded, path is {{.}}</p>
</html>

static/css/index.css

1
2
3
4
5
p {
color: orange;
font-weight: 700;
font-size: 20px;
}

静态文件

dain/dain.go

  • Static 这个方法是暴露给用户的。用户可以将磁盘上的某个文件夹 root 映射到路由 relativePath

  • createStaticHandler 方法用于提供一个利用 relativePath 来访问本地文件系统的 HTTP 处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (group *RouterGroup) createStaticHandler(relativePath string, fs http.FileSystem) HandlerFunc {
absolutePath := path.Join(group.prefix, relativePath)
fileServer := http.StripPrefix(absolutePath, http.FileServer(fs))
return func(c *Context) {
file := c.Param("filepath")
// 检查文件是否可以访问
if _, err := fs.Open(file); err != nil {
c.Status(http.StatusNotFound)
return
}
fileServer.ServeHTTP(c.Writer, c.Req)
}
}

// Static 将磁盘上的某个路径 root 映射到 relativePath 上
func (group *RouterGroup) Static(relativePath string, root string) {
handler := group.createStaticHandler(relativePath, http.Dir(root))
urlPattern := path.Join(relativePath, "/*filepath")
// 注册
group.Get(urlPattern, handler)
}

模板渲染

Engine

dain/dain.go

修改 Engine 结构,添加用于记录模板的属性:

1
2
3
4
5
6
7
8
9
10
11
type Engine struct {
// 路由器
router *router
// 继承 RouterGroup,把根也看作是一个分组
*RouterGroup
// 记录所有的路由分组
groups []*RouterGroup
// 模板
htmlTemplates *template.Template
funcMap template.FuncMap
}

增加两个方法,分别用于注册模板函数以及加载模板文件

1
2
3
4
5
6
7
func (e *Engine) SetFuncMap(funcMap template.FuncMap) {
e.funcMap = funcMap
}

func (e *Engine) LoadHTMLGlob(pattern string) {
e.htmlTemplates = template.Must(template.New("").Funcs(e.funcMap).ParseGlob(pattern))
}

Context

dain/context.go

Context 结构体中增加指向引擎 Engine 的属性,用于访问模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Context struct {
// HTTP 请求 响应
Writer http.ResponseWriter
Req *http.Request
// 请求信息
Path string // 请求路径
Method string // 请求方法
Params map[string]string // 路由参数,如 /hello/:user 匹配 /hello/dawn,则 Params["user"]=dawn
// 响应信息
StatusCode int // 响应码
// 中间件
handlers []HandlerFunc // 存储中间件
index int // 执行的中间件下标
// 指向 *Engine
engine *Engine
}

同时,修改 c.HTML 方法,使之能够渲染模板:

1
2
3
4
5
6
7
8
9
10
11
12
func (c *Context) Fail(code int, err string) {
c.index = len(c.handlers)
c.JSON(code, H{"msg": err})
}

func (c *Context) HTML(code int, name string, data interface{}) {
c.SetHeader("Content-Type", "text/html")
c.Status(code)
if err := c.engine.htmlTemplates.ExecuteTemplate(c.Writer, name, data); err != nil {
c.Fail(http.StatusInternalServerError, err.Error())
}
}

dain/dain.go

因为在 Context 中增加了指向 Engine 的字段,所以需要在 ServeHTTP 中对 c.engine 赋值:

1
2
3
4
5
6
7
8
9
10
11
// 实现 http.Handler 接口,自定义路由器
func (e *Engine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c := NewContext(w, r)
for _, group := range e.groups {
if strings.HasPrefix(r.URL.Path, group.prefix) {
c.handlers = append(c.handlers, group.middleware...)
}
}
c.engine = e
e.router.handle(c)
}