Dawn's Blogs

分享技术 记录成长

0%

Constituency Parsing

Constituency Parsing 就是:

  • 找出一段 text span 作为 constituents

  • 每一个 constituents 都有一个标签

1655892270551

  • 对于每一个单词都是一个 constituent(标签为这个单词的词性)
  • 相邻的 constituent 可以组成一个更大的 constituent
  • 一句话的所有单词,从底向上,可以组成一棵树

1655892544642

Chart-based

Chart-based 方法实际上就是对每一个 span 进行两次分类

  • 放入二分类:判断是否是 constituent
  • 放入多分类:constituent 属于哪一个标签

下图是 Chart-based 的结构,其中 Span Feature Extraction 与 Coreference Resolution 中的一样

1655893866459


需要注意的是,对于 span 的选择可能会产生矛盾,比如两个重合的 span 都被判断出是 constituent,那么就无法组成一棵树。

1655894066479

解决方法就是穷举出所有可能性的树,然后对每一棵树进行评分,选取评分最高的树。

Transition-based

Transition-based 中由三个部分组成:

  • Stack:初始为空。
  • Buffer:初始存放整个句子。
  • Actions:包括三种操作
    • NT (X):创建一个带有 X 标签的 constituent
    • SHIFT:将一个 token 从 Buffer 移动到 Stack 中
    • REDUCE:结束一个 constituent

1655894315635

实际上,我们需要训练一个分类模型,用于输出 Actions。

1655894356049

Tree to Sequence

甚至,我们可以利用 Seq2seq Model,将语法树变为一个 Sequence,比如:

对树进行遍历,得到遍历序列,这个遍历序列就是 Sequence。需要注意的是,这个模型不需要输出单词(因为可能会改变输入的句子),可以用 XX 表示输入的一个单词。

1655894496403

Dependency Parsing

Constituency Parsing 考虑的是一个句子中,相邻单词的关系。

Dependency Parsing 考虑的是任意两个单词(不需要相邻)的关系,用箭头表示这种关系(标签为关系的类别),起始为 head,结束为 dependent。

1655894898468

Dependency Parsing就是将一句话变为有向图(Directed Graph,实际上也是一棵树),word 变为 node,关系变为 edge。

  • 所有 word 只有一个入边(除去 ROOT)。
  • 从每一个 word 到 ROOT 有唯一的一条路径。

1656986928287

核心方法就是:两个分类器,输入是两个 word。

  • 判断左边是否指向右边。
  • 判断属于哪一种关系。

1656987793280

Self-Attention 机制

将各个向量放入 Self-Attention(可以使用多次) 中,得到与整个句子都相关的另外的向量。

1655719447203

结构

Self-Attention 层,输入一些向量,输出另一些向量。每一个输出的向量与输入的向量都有关系。

1655719567664

对于如何输出一个向量,实际上是看其他向量是否对应的输入有关系(relevant)。这里的有关系的程度用 α 表示:

1655719768962

对于如何计算 α,有两种方式:

  • Dot-product:输入向量乘以一个矩阵 W,之后再做点乘,点乘结果为 α。(transformer使用)
  • Additive:输入向量乘以一个矩阵 W,相加之后进入 tanh,最后经过线性变换得到 α。

1655719747182

过程

  • 得到相关性分数 α:首先输入乘以矩阵得到向量 q 和 k,将 q 和 k 点乘(dot-product)后进入 soft-max(作用是 normalization,也可以用其他的) 层:

1655721797344

  • 根据 α 提取信息:将输入与矩阵相乘得到向量 v,再与 soft-max 的输出相乘并相加得到 Self-Attention 的输出 b。相关性大,则 α 大,所以输出中对应 v 的占比越高

1655721841471

Multi-head Self-Attention

这是 Self-Attention 的变形,用于计算不同种类的相关性。

最大的不同就是 q、k、v 三种向量乘以多个矩阵(矩阵的个数就是 head 的数量,即种数)得到不同的种类,每一种单独 Attention 得到每一种对应的输出。

1655722732610

最后将每一种输出乘以一个矩阵,得到最终的输出。

1655722841770

加上位置信息 - Positional Encoding

上述的 Self-Attention 中,是没有位置信息的。若需要位置信息,则需要 Positional Encoding。具有工作如下:

  • 每一个位置 i,都有一个唯一的位置向量 ei
  • 输入加上位置向量之后再 Attention 即可

1655723108847

Transformer

Transformer 是一种 Seq2seq Model(输入一个 sequence,输出一个 sequence)。

Seq2seq 结构

Seq2seq Model 的结构包括一个 Encoder 和一个 Decoder

1655809629816

Encoder

Encoder 输入一排向量,输出另外一排向量。

1655809876570

其中 Encoder 是 N 个 block 的重复,每一个 block 的结构如下:

  • Self-Attention
  • Residual + Norm
  • 全连接层(上图中 Feed Forward)
  • Residual + Norm

1655809982256

Decoder

Autoregressive(AT)

Autoregressive 就是在输出时,从左到右依次输出。其最显著的特点就是,每一个 Decoder 的输出作为下一次 Decoder 的输入

Decoder 和 Encoder 的结构比较类似,区别在于:

  • 第一个 Attention 变为了 Masked Multi-Head Attention(Encoder中为 Multi-Head Attention):Mask 的含义就是每一次 Attention,只看前面的向量,不看后面的向量

1655811020124

  • 增加了一层 Multi-Head Attention 和 Add & Norm
  • 最后一个 block 的输出进入线性层soft-max 层,输出的是最大可能性对应的结果(将这个输出放入下一次 Decoder 的输入)。

1655810751137

Non-autoregressive(NAT)

AT 和 NAT 的比较

  • AT 将 Decoder 的输出作为下一次 Decoder 的输入;NAT 输入的只有 BEGIN token,输入不会进入 Decoder 的输入。
  • 如何得到 NAT 的输出长度?(Seq2seq 输出长度是不确定的)
    • 训练一个 Model 进行输出长度的预测。
    • 输出一个很长的 sequence,忽略 END token 之后的东西。
  • NAT 的优点:
    • 可以并行化计算,因为不用等待上一个 Decoder 的输出。
    • 可以控制输出长度。
  • NAT 通常比 AT 表现更差。

1655811450753

Encoder-Decoder 之间的连接部分

实际上,Decoder 中多出的一层 Multi-Head Attention 和 Add & Norm,就是用于连接 Encoder 和 Decoder。

这一部分被称为 Cross Attention

1655811789284

Cross Attention 的详细结构如下,计算 Decoder 中 Masked Multi-Head Attention 的输出向量与 Encoder 的输出之间的相关性(Attention)。

1655811904635

如何训练

训练时,采用强制学习(Teacher Forcing):每一次向 Decoder 的输入并不是上一次 Decoder 的输出,而是正确的结果。

1655812460812

Coreference Resolution

Coreference Resolution,即代指消解,识别出代指的相同的东西。

需要识别出代指的一段文字(如他、它的 XX 等),称为 mention。代指消解的结果就是将同一个代指的 mention,放入同一个 cluster 中。

步骤

  • 识别出 mention:需要一个二分类器,输入一个 span,判别是否是一个 mention。若有 N 个 token 的句子,需要运行 N(N-1)/2 次。

1655291452800

  • 识别哪些 mention 需要放在同一个 cluster 中:同样需要一个二分类器,输入为两个 mention,输出判断是否属于同一个 cluster。若有 K 个 mention,需要运行 K(K-1)/2 次。

1655291785154

或者可以直接输入两个 span 到二分类器中,判断这两个 span 是否代指同一个实体。若有 N 个 token,则有 K=N(N-1)/2 个 span,需要运行 K(K-1)/2 次。

1655291986984

训练二分类器

一个通常的用于 Coreference Resolution 二分类器如下:

  • 将句子中的所有 token 输入预训练模型中,得到 embedding。

  • 将 embedding span 输入 Span Feature Extraction,得到两个向量(每一个 Span Feature Extraction 将 embedding 汇聚成一个 embedding)。

  • 判断两个向量是否是 mention、是否属于同一个 cluster,最终输出一个分数。

1655292350102

对于 Span Feature Extraction,结构如下。将 embedding span 进行 Attention,得到一个 Attention 向量,再把 span 中起始 embedding最后一个 embeddingAttention 向量相加,得到 Span Feature Extraction 的输出向量。

1655294345873


上述是有监督模型,那么是否可以训练一个无监督模型呢?

答案是可以的,把 mention Mask 起来,这样模型的输出就是相关的代指实体。

1655295040850

内存管理

Go 中,内存分配主要有两个思想:

  • 分块
  • 缓存

分块

分块的思路为:

  • 调用系统调用 mmap() 向 OS 申请一大块内存,例如 4MB。
  • 将内存划分为大块,例如 8KB,称为 mspan
    • noscan mspan:分配不包含指针的对象,GC 不需要扫描。
    • scan mspan:分配包含指针的对象,GC 需要扫描。
  • 再将 mspan 划分为特定大小的小块,用于对象分配

缓存

缓存的基本思路是:

  • 每一个 P 包含一个 mcache 用于快速分配,mcache 管理一组 mspan
  • 当 mcache 中的 mspan 分配完毕,向 mcentral 申请带有未分配块的 mspan。
  • 当 mspan 中没有分配的对象,mspan 会被缓存在 mcentral 中,而不是立刻释放归还给 OS。

1655263401798

优化 - Balanced GC

字节跳动有自己的 Go 语言内存管理优化方案,即 Balanced GC,其思路如下:

每个 G 都绑定一大块内存(1 KB),称为 Goroutine Allocation Buffer(GAB)

  • GAB 用于 noscan 类型的小对象(小于 128B)的分配。
  • GAB 使用三个指针进行维护:base、end、top。使用指针碰撞风格进行对象分配。

1655263925528

  • GAB 对于 Go 内存管理来说就是一个大对象

GAB 有一个问题:会导致内存被延迟释放,GAB 中即使只有一个很小的对象存活,Go 内存管理也不会回收其余空闲空间。

解决方法

当 GAB 中存活对象大小少于一定阈值时,将 GAB 中存活的对象复制到另外分配的 GAB(Survivor GAB)中,原先的 GAB 可以释放。

1655264117801

编译器优化

函数内联

函数内联是指,将被调用函数的函数体副本替换调用位置上,同时重写代码以反映参数的绑定。

优点

  • 消除函数调用的开销,如传递参数、保存寄存器等。
  • 扩展了函数边界,更多对象不逃逸分析。

缺点

  • 函数体变大。
  • 编译生成的可执行文件变大。

逃逸分析

逃逸分析步骤

  • 从对象分配处出发,观察对象的数据流。
  • 若发现指针 p 在当前作用域 s:
    • 作为参数传递给其他函数
    • 传递给全局变量
    • 传递给其他 goroutine
    • 传递给已逃逸的指针指向的对象
  • 则指针 p 指向的对象逃逸出 s,反正没有逃逸。

对于未逃逸的对象,在上分配;逃逸对象,在上分配。

Go 依赖管理演进

Go 语言中,依赖管理的演进分为三个阶段,依次是:

  • GOPATH
  • Go Vendor
  • Go Module

GOPATH

配置环境变量 $GOPATH,GOPATH 下有以下三个文件夹:

  • bin:项目编译的二进制文件
  • pkg:项目编译的中间产物,用于加速编译
  • src:项目源码

项目的代码直接依赖于 src 下的代码,可以通过 go get 命令将依赖包下载到 src 下。


GOPATH 的缺点在于:无法实现对 package 的多版本控制

若 A 和 B 依赖于某一 package 的不同版本,这样的情况 GOPATH 无法解决。

Go Vendor

项目目录下增加 vendor 文件夹,所有依赖包的副本存放在项目下的 vendor 文件夹中。

若 vendor 中没有依赖包,则会在 GOPATH 下去寻找。


Go Vendor 的缺点在于:无法控制依赖的版本、更新项目可能出现依赖冲突。

若一个项目依赖于 package B 和 packag C,而 package B 依赖于 package D-V1 版本;package C 依赖于 package D-V2 版本。这样的场景下,Go Vendor 无法很好的解决。

1655220603232

Go Module

Go Module 通过 go.mod 文件管理依赖包版本。

通过 go get / go mod 工具,管理依赖包。

Go Module 详解

依赖管理三要素

Go Module 中,依赖管理需要三要素:

  • go.mod:配置文件,描述依赖。
  • Proxy:中心仓库管理依赖库。
  • go get/mod:本地工具。

go.mod

go.mod 文件主要由三部分构成:

  • 依赖管理基本单元:标识了这个模块可以在哪里找到(被其他人引用)。
  • 原生库:Go 的版本号。
  • 单元依赖:描述依赖关系,主要两部分组成。
    • 包名(Module Path)
    • 版本号

1655220918315

其中,可以看到单元依赖的一些配置:

  • version:
    • 语义化版本${MAJOR}.${MINOR}.${PATCH}MAJOR 是一个大版本,不同 MAJOR 可以不兼容MINOR 做出了一些新增函数,同一个 MAJOR 下需要相互兼容PATCH 做了一些 bug 修复。
    • 基于 commit 伪版本vx.0.0-yyyymmddhhmmss-abcdefgh1234
  • indirect:对于没有直接依赖的 package,就用 indirect 标识出来。
  • incompatible:如果 MAJOR 版本大于 1 时,其版本号还需要体现在 Module 名字中(如 xxx/xx/v2)。但是如果 Module 名字未遵循这条规则,则会打上 incompatible 标记。

Go 在选择版本时,会选择最低的兼容版本

如下图中,最终编译时所使用的 C 项目版本为 1.4 版本。

1655221776213

Proxy

Go Proxy 是一个服务站点,他会缓存源站中的软件内容,缓存的软件版本不会改变,源站软件删除后依然可用。

1655221901310

GOPROXY="https://proxy1.cn,https://proxy2.cn,direct",含义是依次从 proxy1、proxy2、源站中获取 package。

go get/mod

  • go getgo get example.org/pkg,参数如下:

    • @update:默认,获取最新版本。
    • @none:删除依赖。
    • @v1.1.1:语义化版本。
    • @45dfsf:特定的 commit。
    • @master:分支的最新 commit。
  • go mod:参数如下:

    • init:初始化,创建 go.mod 文件。
    • download:下载模块到本地。
    • tidy:增加需要的依赖,删除不需要的依赖。

BERT 介绍

我们将一些不带标注的文章,先预训练,得到一个 Model,这个 Model 可以看作是能够理解文字内容。

接着用一些带标注的特殊语料,进行 Fine-tune(微调),训练出可以完成特殊任务的 Model。

1654002682379

阅读全文 »

NLP 任务分类

NLP 任务总的来说分为两类:

  • 输入文字,输出类别。
  • 输入文字,输出另一段文字。

1653914138938

那么进一步的,可以根据输入和输出的不同,进行划分。

输出的不同

  • 输出类别:
    • 为一段话,只输出一个类别。
    • 为每一个 token 都输出一个类别。

1653914425983

  • 输出另一段文字:
    • 使用 seq2seq 模型。

1653914435623

输入的不同

  • 一段文字。
  • 多段文字:
    • 将多段文字拼接起来,中间用 <SEP> 连接。
    • 分别放入 Model 中,再对输出进行整合。

1653914566952

NLP 任务

1653916977054

知识图谱

知识图谱中,最重要的就是实体(Entity)关系(Relation)

1653917666204

提取实体 - NER

NER,Name Entity Recognition,命名实体识别。用于提取一段文字中给定的实体信息。

1653917699539

提取关系

提取关系,可以看成是一种分类问题。

1653917789907

本节实现 protobuf 进行节点间的通信。最终代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
dawncachepb/
|--dawncachepb.pb.go
|--dawncachepb.proto
lru/
|--lru.go
|--lru_test.go
singleflight/
|--singleflight.go
byteview.go
cache.go
consistenthash.go
dawncache.go
dawncache_test.go
go.mod
http.go
peers.go

使用 protobuf 通信

编写 proto 文件

dawncachepb/dawncachepb.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
syntax = "proto3";

package dawncachepb;

option go_package = "./;dawncachepb";

message Request {
string group = 1;
string key = 2;
}

message Response {
bytes value = 1;
}

service GroupCache {
rpc Get(Request) returns (Response);
}

生成 pb.go 代码

生成 dawncachepb.pb.go

1
protoc --go_out=. *.proto

dawncachepb/dawncachepb.pb.go 有如下数据类型:

1
2
3
4
5
6
7
8
9
10
type Request struct {
// ...
Group string `protobuf:"bytes,1,opt,name=group,proto3" json:"group,omitempty"`
Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
}

type Response struct {
// ...
Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
}

修改 PeerGetter 接口

修改 peers.go 中的 PeerGetter 接口,参数使用 geecachepb.pb.go 中的数据类型。

1
2
3
4
5
// PeerGetter 远程获取数据的接口
type PeerGetter interface {
// Get 根据 groupName 和 key 获取源数据
Get(in *pb.Request, out *pb.Response) error
}

修改使用 PeerGetter 接口的代码

dawncache.go

1
2
3
4
5
6
7
8
9
10
11
12
13
// getFromPeer 从 peer 处获取数据
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
req := &pb.Request{
Group: g.name,
Key: key,
}
res := &pb.Response{}
err := peer.Get(req, res)
if err != nil {
return ByteView{}, err
}
return ByteView{b: res.Value}, nil
}

http.go

  • ServeHTTP() 中使用 proto.Marshal() 编码 HTTP 响应。
  • Get() 中使用 proto.Unmarshal() 解码 HTTP 响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// ServeHTTP 处理查询缓存的请求,实现了 http.Handler 接口
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 判断是否有 basePath
if !strings.HasPrefix(r.URL.Path, p.basePath) {
http.Error(w, "HTTPPool serving unexpected path: "+r.URL.Path, http.StatusBadRequest)
return
}

// 检查是否有 groupName 和 key
parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}

groupName := parts[0]
key := parts[1]

// 通过 groupName 获取 group
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group:"+groupName, http.StatusBadRequest)
return
}

// 从缓存中获取数据
view, err := group.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// 响应客户端
body, err := proto.Marshal(&pb.Response{Value: view.ByteSlice()}) // 编码
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(body)
}

// Get 实现了 PeerGetter 接口,用于远程获取源数据
func (h *HTTPGetter) Get(in *pb.Request, out *pb.Response) error {
url := fmt.Sprintf("%s%s/%s", h.basePath, url.QueryEscape(in.GetGroup()), url.QueryEscape(in.GetKey()))

res, err := http.Get(url)
if err != nil {
// 发送请求失败
return err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
// 状态码不是 200
return fmt.Errorf("server status code: %v", res.StatusCode)
}

data, err := ioutil.ReadAll(res.Body)
if err != nil {
// 读取数据失败
return errors.New("read response body failed")
}

if err = proto.Unmarshal(data, out); err != nil {
return fmt.Errorf("decoding response body: %v", err)
}

return nil
}

本节实现实现了防止缓存击穿的措施,通过多个并发请求映射为一个请求来实现。最终代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
lru/
|--lru.go
|--lru_test.go
singleflight/
|--singleflight.go
byteview.go
cache.go
consistenthash.go
dawncache.go
dawncache_test.go
go.mod
http.go
peers.go

概念

  • 缓存雪崩:缓存在同一时刻全部失效,造成瞬时 DB 请求量过大、压力骤增。缓存雪崩通常因为缓存服务器宕机、缓存的 key 设置了相同的过期时间等引起。
  • 缓存击穿:一个存在的 key,在缓存过期的一瞬间,同时有大量的请求,这些请求都会击穿到 DB ,造成瞬时DB请求量大、压力骤增。
  • 缓存穿透:查询一个不存在的数据,因为不存在则不会写到缓存中,所以每次都会去请求 DB。

singleflight 实现

singleflight/singleflight.go

dawncache 通过 singleflight 来实现防止缓存击穿。

定义结构体

首先定义 call,代表一次请求:

1
2
3
4
5
6
// call 代表一次查询请求
type call struct {
wg sync.WaitGroup
val interface{}
err error
}

Group 记录了待查询的 key 和一次请求之间的映射关系。

当 key 还在 hashMap 中时,视为一次请求即可。

1
2
3
4
type Group struct {
mu sync.Mutex // 对 hashMap 的访问互斥
hashMap map[string]*call // 保存 key 和请求的映射关系
}

实现 Do 方法

Do 方法实现了多次相同的查询,到一次请求的映射操作:

  • 查询 hashMap 中是否已经记录了 key 对应的 call 操作,如果有,则等待这一次请求得到数据并返回结果。
  • 如果不在 hashMap 中,则新建一个查询请求 call 并记录在 hashMap 中,待执行过查询操作之后再返回数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.hashMap == nil { // 延迟初始化
g.hashMap = make(map[string]*call)
}
if c, ok := g.hashMap[key]; ok {
// 已在 hashMap 中记录,等待结果即可
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}

// 没有在 hashMap 中记录
// 新建 call 在 hashMap 中记录
c := new(call)
c.wg.Add(1)
g.hashMap[key] = c
g.mu.Unlock()

// 远程请求数据
c.val, c.err = fn()
c.wg.Done() // 得到数据

g.mu.Lock()
delete(g.hashMap, key)
g.mu.Unlock()

return c.val, c.err
}

修改主流程

dawncache.go

需要修改 Group 结构体,使之能够防止缓存穿透:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Group struct {
// ...
loader *singleflight.Group
}

// NewGroup 新建一个 *Group 缓存
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
// ...
g := &Group{
// ...
loader: new(singleflight.Group),
}
// ...
}

修改 load 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// load 从别处加载数据
func (g *Group) load(key string) (ByteView, error) {
view, err := g.loader.Do(key, func() (interface{}, error) {
if g.peers != nil {
// peers 不为空,可以从远程获取数据
if peer, ok := g.peers.PickPeer(key); ok {
// 从远程获取数据
view, err := g.getFromPeer(peer, key)
if err != nil {
log.Println("[GeeCache] Failed to get from peer", err)
return ByteView{}, err
}
return view, nil
}
}
// 本地通过回调函数获取数据
return g.getLocally(key)
})
if err != nil {
return ByteView{}, err
}
return view.(ByteView), nil
}

本节实现一致性哈希选择节点,并且实现了 HTTP 客户端。最终代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
lru/
|--lru.go
|--lru_test.go
byteview.go
cache.go
consistenthash.go
dawncache.go
dawncache_test.go
go.mod
http.go
peers.go

PeerPicker

分布式缓存获取数据的流程如下:

1
2
3
4
5
6

接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴
| 否 是
|-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵
| 否
|-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 ⑶

之前已经实现了 (1) 和 (3) 现在需要实现流程 (2) 从远程获取数据:

1
2
3
4
使用一致性哈希选择节点        是                                    是
|-----> 是否是远程节点 -----> HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值
| 否 ↓ 否
|----------------------------> 回退到本地节点处理。

抽象 PeerPicker

peers.go

  • 现在需要定义一个 PeerPicker 接口,用于选择与哪一个节点进行通信。

  • PeerGetter 接口用于具体的远程获取数据的操作。

1
2
3
4
5
6
7
8
9
10
11
// PeerPicker 选取节点的接口
type PeerPicker interface {
// PickPeer 根据 key 选择相应的 PeerGetter 获取数据
PickPeer(key string) (peer PeerGetter, ok bool)
}

// PeerGetter 远程获取数据的接口
type PeerGetter interface {
// Get 根据 groupName 和 key 获取源数据
Get(groupName string, key string) ([]byte, error)
}

节点选择与 HTTP 客户端

http.go

实现 PeerGetter

首先定义一个 HTTPGetter 结构体,用于实现 PeerGetter 接口:

1
2
3
4
// HTTPGetter 通过 HTTP 远程获取数据
type HTTPGetter struct {
basePath string
}

实现 Get 方法,用于远程获取数据,它通过发送 HTTP GET 请求来从远程获取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Get 实现了 PeerGetter 接口,用于远程获取源数据
func (h *HTTPGetter) Get(groupName string, key string) ([]byte, error) {
url := fmt.Sprintf("%s%s/%s", h.basePath, groupName, key)

res, err := http.Get(url)
if err != nil {
// 发送请求失败
return nil, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
// 状态码不是 200
return nil, fmt.Errorf("server status code: %v", res.StatusCode)
}

data, err := ioutil.ReadAll(res.Body)
if err != nil {
// 读取数据失败
return nil, errors.New("read response body failed")
}

return data, nil
}

var _ PeerGetter = (*HTTPGetter)(nil) // 检查实现 PeerGetter 接口

实现 PeerPicker

修改 HTTPPool 结构体,使之记录一致性哈希的结构体以及 HTTPGetter

1
2
3
4
5
6
7
type HTTPPool struct {
self string // 如 http://127.0.0.1:8080
basePath string // 节点间通讯地址的前缀,如 http:// 127.0.1:8080/basePath/groupName/key 用于请求数据
mu sync.Mutex
peers *Map // 一致性哈希,根据 key 来选择节点
httpGetters map[string]*HTTPGetter // 根据 baseURL 选择 HTTPGetter
}

Set 方法用于添加节点,即在一致性哈希的哈希环中添加节点:

1
2
3
4
5
6
7
8
9
10
11
12
// Set 添加节点
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = New(defaultReplicas, nil)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*HTTPGetter)

for _, peer := range peers {
p.httpGetters[peer] = &HTTPGetter{basePath: peer + p.basePath}
}
}

PickPeer 方法用于根据 key 选择节点,实现 PeerPicker 接口:

1
2
3
4
5
6
7
8
9
10
// PickPeer 实现 PeerPicker 接口,用于根据 key 选择节点
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if peer := p.peers.Get(key); peer != "" && peer != p.self {
p.Log("Pick peer %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}

实现主流程

dawncache.go

修改 Group 结构体,添加 PeerPicker

1
2
3
4
5
6
type Group struct {
name string // 一个组的命名空间,用于区分不同的缓存,如学生姓名、成绩可以放到不同的缓存中去
getter Getter // 当查找数据未命中时,调用该函数获取值
mainCache cache // 底层缓存
peers PeerPicker
}

实现一个方法用于注册 PeerPicker

1
2
3
4
5
6
7
8
// RegisterPeers 注册 PeerPicker
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
// RegisterPeers 不允许调用超过1次
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}

修改 load 方法,使之:

  • 既能够从远程获取数据。
  • 又能够在本地通过回调函数获取数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// load 从别处加载数据
func (g *Group) load(key string) (ByteView, error) {
if g.peers != nil {
// peers 不为空,可以从远程获取数据
if peer, ok := g.peers.PickPeer(key); ok {
// 从远程获取数据
view, err := g.getFromPeer(peer, key)
if err != nil {
log.Println("[GeeCache] Failed to get from peer", err)
return ByteView{}, err
}
return view, nil
}
}
// 本地通过回调函数获取数据
return g.getLocally(key)
}

// getFromPeer 从 peer 处获取数据
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
data, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{b: data}, nil
}