Dawn's Blogs

分享技术 记录成长

0%

godis源码阅读 (6) AOF持久化和AOF重写

AOF 持久化

数据结构定义

Persister

Persister 是 AOF 持久化中的核心数据结构,它从 channel 中接收消息并且将消息写入到 AOF 文件中。部分字段如下:

  • db:这个 AOF 持久化协程所指向的数据库
  • tmpDBMaker:新建一个临时的数据库,用于 AOF 重新操作。
  • aofChan:Persister 就是从这个通道中监听消息,并且将消息写入到 AOF 文件中的。
  • aofFsync:AOF 文件刷入到磁盘的策略,有三种选项可以选择:FsyncAlways(每一个命令都会进行刷盘操作),FsyncEverySec(每秒钟进行一次刷盘操作),FsyncNo(不主动进行刷盘操作,交给操作系统去决定)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Listener will be called-back after receiving a aof payload
// with a listener we can forward the updates to slave nodes etc.
type Listener interface {
// Callback will be called-back after receiving a aof payload
Callback([]CmdLine)
}

// Persister receive msgs from channel and write to AOF file
type Persister struct {
ctx context.Context
cancel context.CancelFunc
db database.DBEngine
tmpDBMaker func() database.DBEngine
aofChan chan *payload
aofFile *os.File
aofFilename string
aofFsync string
// aof goroutine will send msg to main goroutine through this channel when aof tasks finished and ready to shut down
aofFinished chan struct{}
// pause aof for start/finish aof rewrite progress
pausingAof sync.Mutex
currentDB int
listeners map[Listener]struct{}
// reuse cmdLine buffer
buffer []CmdLine
}

payload

payload 为 aofChan 通道内的消息,包括:命令行参数(cmdLine)、数据库(dbIndex)、用于同步的信号量(wg)。

1
2
3
4
5
6
7
8
// CmdLine is alias for [][]byte, represents a command line
type CmdLine = [][]byte

type payload struct {
cmdLine CmdLine
dbIndex int
wg *sync.WaitGroup
}

NewPersister 新建一个 AOF 持久化工作协程

数据库可以利用 NewPersister 方法新建一个 AOF 持久化进程进行 AOF 持久化。

1
2
// NewPersister creates a new aof.Persister
func NewPersister(db database.DBEngine, filename string, load bool, fsync string, tmpDBMaker func() database.DBEngine) (*Persister, error)

其流程如下:

  • 新建一个 Persister,填写 AOF 文件名、刷盘策略、数据库等信息。
1
2
3
4
5
6
persister := &Persister{}
persister.aofFilename = filename
persister.aofFsync = strings.ToLower(fsync)
persister.db = db
persister.tmpDBMaker = tmpDBMaker
persister.currentDB = 0
  • 如果需要加载 AOF 文件中已有的数据,则执行 persister.LoadAof 方法加载命令。
1
2
3
if load {
persister.LoadAof(0)
}
  • 打开 AOF 持久化文件,初始化监听命令的通道等。
1
2
3
4
5
6
7
8
aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
persister.aofFile = aofFile
persister.aofChan = make(chan *payload, aofQueueSize)
persister.aofFinished = make(chan struct{})
persister.listeners = make(map[Listener]struct{})
  • 开启一个协程,用于监听 aofChan,将命令写入到 AOF 文件中
1
2
3
go func() {
persister.listenCmd()
}()
  • 如果刷盘策略为 FsyncEverySec,则开启一个协程用于每秒钟刷盘操作
1
2
3
4
5
6
7
ctx, cancel := context.WithCancel(context.Background())
persister.ctx = ctx
persister.cancel = cancel
if persister.aofFsync == FsyncEverySec {
persister.fsyncEverySecond()
}
return persister, nil

fsyncEverySecond 每秒钟的刷盘操作

persister.fsyncEverySecond 方法用于每秒钟的刷盘操作,它每过一秒钟执行一次 persister.aofFile.Sync,将内存中的数据刷入到磁盘中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (persister *Persister) fsyncEverySecond() {
ticker := time.NewTicker(time.Second)
go func() {
for {
select {
case <-ticker.C:
persister.pausingAof.Lock()
if err := persister.aofFile.Sync(); err != nil {
logger.Errorf("fsync failed: %v", err)
}
persister.pausingAof.Unlock()
case <-persister.ctx.Done():
return
}
}
}()
}

LoadAof 加载 AOF 文件中的数据

persister.LoadAof 方法用于读取 AOF 文件,这个方法在监听 aofChan 之前使用。

1
2
// LoadAof read aof file, can only be used before Persister.listenCmd started
func (persister *Persister) LoadAof(maxBytes int)

其流程如下:

  • 首先将 aofChan 设置为 nil,因为 persister.db.Exec 在执行 AOF 文件中的命令时,可能又会向 aofChan 中加入命令。这些命令是不需要加入到 aofChan 中的(加入 aofChan 中数据会出错,因为这算是又在 AOF 文件中记录了一次),从 AOF 文件中读取并执行即可。
1
2
3
4
5
6
7
// persister.db.Exec may call persister.addAof
// delete aofChan to prevent loaded commands back into aofChan
aofChan := persister.aofChan
persister.aofChan = nil
defer func(aofChan chan *payload) {
persister.aofChan = aofChan
}(aofChan)
  • 打开 AOF 文件,从 AOF 文件中读取 maxBytes 字节的数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
file, err := os.Open(persister.aofFilename)
if err != nil {
if _, ok := err.(*os.PathError); ok {
return
}
logger.Warn(err)
return
}
defer file.Close()

var reader io.Reader
if maxBytes > 0 {
reader = io.LimitReader(file, int64(maxBytes))
} else {
reader = file
}
  • 读取 AOF 文件复用了协议解析器,fakeConn 仅仅用于持久化操作中(它表示一个虚拟的客户端连接,仅仅用于执行 AOF 文件中的命令)。
1
2
ch := parser.ParseStream(reader)
fakeConn := connection.NewFakeConn() // only used for save dbIndex
  • 从解析解析器返回的通道中,依次读取命令。调用 persister.db.Exec 方法在数据库中执行命令,需要注意的是如果遇到 SELECT 命令则设置 persister 当前数据库。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
for p := range ch {
if p.Err != nil {
if p.Err == io.EOF {
break
}
logger.Error("parse error: " + p.Err.Error())
continue
}
if p.Data == nil {
logger.Error("empty payload")
continue
}
r, ok := p.Data.(*protocol.MultiBulkReply)
if !ok {
logger.Error("require multi bulk protocol")
continue
}
ret := persister.db.Exec(fakeConn, r.Args)
if protocol.IsErrorReply(ret) {
logger.Error("exec err", string(ret.ToBytes()))
}
if strings.ToLower(string(r.Args[0])) == "select" {
// execSelect success, here must be no error
dbIndex, err := strconv.Atoi(string(r.Args[1]))
if err == nil {
persister.currentDB = dbIndex
}
}
}

listenCmd 监听命令,写入 AOF 文件

persister.listenCmd 用于监听 aofChan 通道,将通道中的命令写入到 AOF 文件中。每次接收到命令时,都会调用 persister.writeAof 写入 AOF 文件。

1
2
3
4
5
6
7
// listenCmd listen aof channel and write into file
func (persister *Persister) listenCmd() {
for p := range persister.aofChan {
persister.writeAof(p)
}
persister.aofFinished <- struct{}{}
}

writeAof 将一条命令写入 AOF 文件中

persister.writeAof 方法用于将一条命令写入到 AOF 文件中。

1
func (persister *Persister) writeAof(p *payload)

其流程如下:

  • 首先,选择正确的数据库。每个客户端都可以选择自己的数据库,所以 payload 中要保存客户端选择的数据库。选择的数据库与 AOF 文件中当前的数据库不一致时写入一条 Select 命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
persister.buffer = persister.buffer[:0] // reuse underlying array
persister.pausingAof.Lock() // prevent other goroutines from pausing aof
defer persister.pausingAof.Unlock()
// ensure aof is in the right database
if p.dbIndex != persister.currentDB {
// select db
selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))
persister.buffer = append(persister.buffer, selectCmd)
data := protocol.MakeMultiBulkReply(selectCmd).ToBytes()
_, err := persister.aofFile.Write(data)
if err != nil {
logger.Warn(err)
return // skip this command
}
persister.currentDB = p.dbIndex
}
  • 接着写入命令内容
1
2
3
4
5
6
7
// save command
data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes()
persister.buffer = append(persister.buffer, p.cmdLine)
_, err := persister.aofFile.Write(data)
if err != nil {
logger.Warn(err)
}
  • 接着调用 listener.CallBack,如果刷盘策略为 FsyncAlways(每一条命令都刷盘),则调用 persister.aofFile.Sync 刷盘。
1
2
3
4
5
6
for listener := range persister.listeners {
listener.Callback(persister.buffer)
}
if persister.aofFsync == FsyncAlways {
_ = persister.aofFile.Sync()
}

SaveCmdLine 向 aofChan 通道中发送命令

persister.SaveCmdLine 方法用于向 aofChan 通道中发送一条命令。其流程如下:

  • 如果 aofChan 为空,则说明在 LoadAof 过程中,直接返回即可。
  • 如果刷盘策略为 FsyncAlways,则直接调用 persister.writeAof 方法将命令写入 AOF 文件中。
  • 否则,就将命令和执行这条命令的数据库发送到 aofChan 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// SaveCmdLine send command to aof goroutine through channel
func (persister *Persister) SaveCmdLine(dbIndex int, cmdLine CmdLine) {
// aofChan will be set as nil temporarily during load aof see Persister.LoadAof
if persister.aofChan == nil {
return
}
if persister.aofFsync == FsyncAlways {
p := &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
}
persister.writeAof(p)
return
}
persister.aofChan <- &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
}
}

Close 优雅的关闭 AOF 持久化

persister.Close 方法用于优雅的关闭 AOF 持久化,其流程如下:

  • 首先关闭 aofChan 通道,接着等待 AOF 持久化完成(persister.listenCmd 方法结束),关闭 AOF 文件句柄
  • 调用 persister 中上下文 cancel 方法,用于结束 persister.fsyncEverySecond 方法。
1
2
3
4
5
6
7
8
9
10
11
12
// Close gracefully stops aof persistence procedure
func (persister *Persister) Close() {
if persister.aofFile != nil {
close(persister.aofChan)
<-persister.aofFinished // wait for aof finished
err := persister.aofFile.Close()
if err != nil {
logger.Warn(err)
}
}
persister.cancel()
}

AOF 重写

AOF 重写可以减小持久化文件的大小,以删除无用的指令。

重写必须在固定不变的数据集上进行,不能直接使用内存中的数据。在 godis 中,采用读取 AOF 文件生成副本的方式进行重写操作。

流程如下:

  1. 重写开始:暂停 AOF 写入 -> 准备重写 -> 恢复AOF写入。
  2. 执行重写:重写协程读取 AOF 文件中的前一部分(重写开始前的数据,不包括读写过程中写入的数据)并重写到临时文件中。
  3. 重写结束:暂停 AOF 写入 -> 将重写过程中产生的新数据写入临时文件中 -> 使用临时文件覆盖 AOF 文件(使用文件系统的 mv 命令保证安全) -> 恢复 AOF 写入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// RewriteCtx holds context of an AOF rewriting procedure
type RewriteCtx struct {
tmpFile *os.File
fileSize int64
dbIdx int // selected db index when startRewrite
}

// Rewrite carries out AOF rewrite
func (persister *Persister) Rewrite() error {
ctx, err := persister.StartRewrite()
if err != nil {
return err
}
err = persister.DoRewrite(ctx)
if err != nil {
return err
}

persister.FinishRewrite(ctx)
return nil
}

StartRewrite 重写开始

persister.StartRewrite 用于重写开始的操作。

1
2
// StartRewrite prepares rewrite procedure
func (persister *Persister) StartRewrite() (*RewriteCtx, error)

流程如下:

  • 首先,暂停 AOF 写入,数据会在 aofChan 中堆积(在重写开始结束后恢复 AOF 写入)。
1
2
persister.pausingAof.Lock() // pausing aof
defer persister.pausingAof.Unlock()
  • 调用 fsync 将缓冲区数据落盘,防止 AOF 文件不完整造成错误。
1
2
3
4
5
err := persister.aofFile.Sync()
if err != nil {
logger.Warn("fsync failed")
return nil, err
}
  • 获得当前 AOF 文件大小,用于判断哪些数据是 AOF 重写执行过程中产生的。
1
2
3
// get current aof file size
fileInfo, _ := os.Stat(persister.aofFilename)
filesize := fileInfo.Size()
  • 创建临时文件供重写使用。
1
2
3
4
5
6
7
8
9
10
11
// create tmp file
file, err := ioutil.TempFile("", "*.aof")
if err != nil {
logger.Warn("tmp file create failed")
return nil, err
}
return &RewriteCtx{
tmpFile: file,
fileSize: filesize,
dbIdx: persister.currentDB,
}, nil

DoRewrite 执行重写

persister.DoRewrite 方法用于真正的重写 AOF 文件过程。

1
func (persister *Persister) DoRewrite(ctx *RewriteCtx) error

其流程如下:

  • 读取 AOF 文件在重写开始时获取到的文件大小长度的数据,这些数据是重写开始前的数据,将重写的数据加载进入内存
1
2
3
4
5
tmpFile := ctx.tmpFile

// load aof tmpFile
tmpAof := persister.newRewriteHandler()
tmpAof.LoadAof(int(ctx.fileSize))
  • 依次将每一个数据库中的数据,重写进入临时的 AOF 文件中。
    • 对于每一个数据库,首先在临时文件中写入 Select 命令选择正确的数据库
    • 接着调用 tmpAof.db.ForEach 函数遍历数据库中的每一个 key,将每一个键值对写入到临时文件中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// rewrite aof tmpFile
for i := 0; i < config.Properties.Databases; i++ {
// select db
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes()
_, err := tmpFile.Write(data)
if err != nil {
return err
}
// dump db
tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool {
cmd := EntityToCmd(key, entity)
if cmd != nil {
_, _ = tmpFile.Write(cmd.ToBytes())
}
if expiration != nil {
cmd := MakeExpireCmd(key, *expiration)
if cmd != nil {
_, _ = tmpFile.Write(cmd.ToBytes())
}
}
return true
})
}
return nil

在第一步中的 newRewriteHandler 方法:在这个方法中,就调用了 persister.tmpDBMaker 建立一个临时的数据库,加载待重写的 AOF 文件,用于重写操作

1
2
3
4
5
6
func (persister *Persister) newRewriteHandler() *Persister {
h := &Persister{}
h.aofFilename = persister.aofFilename
h.db = persister.tmpDBMaker()
return h
}

FinishRewrite 结束重写

persister.FinishRewrite 方法用于结束重写,这个过程最为复杂。

1
2
// FinishRewrite finish rewrite procedure
func (persister *Persister) FinishRewrite(ctx *RewriteCtx)

其流程如下:

  • 首先暂停 AOF 文件的写入
1
2
persister.pausingAof.Lock() // pausing aof
defer persister.pausingAof.Unlock()
  • 打开 AOF 文件,并 seek 到重写开始的位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
tmpFile := ctx.tmpFile
// write commands executed during rewriting to tmp file
src, err := os.Open(persister.aofFilename)
if err != nil {
logger.Error("open aofFilename failed: " + err.Error())
return
}
defer func() {
_ = src.Close()
}()
_, err = src.Seek(ctx.fileSize, 0)
if err != nil {
logger.Error("seek failed: " + err.Error())
return
}
  • 在临时文件中写入一条 Select 命令,使得临时文件切换到重写开始时选中的数据库。
1
2
3
4
5
6
7
// sync tmpFile's db index with online aofFile
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes()
_, err = tmpFile.Write(data)
if err != nil {
logger.Error("tmp file rewrite failed: " + err.Error())
return
}
  • 对齐数据库后,就可以把重写过程中产生的数据复制到临时文件中了
1
2
3
4
5
6
// copy data
_, err = io.Copy(tmpFile, src)
if err != nil {
logger.Error("copy aof filed failed: " + err.Error())
return
}
  • 使用 mv 命令,令临时文件代替 AOF 文件。
1
2
3
// replace current aof file by tmp file
_ = persister.aofFile.Close()
_ = os.Rename(tmpFile.Name(), persister.aofFilename)
  • 重新打开 AOF 文件,并重新写入一次 Select 命令保证 AOF 文件中的数据库与 persister.currentDB 一致。
1
2
3
4
5
6
7
8
9
10
11
12
13
// reopen aof file for further write
aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
panic(err)
}
persister.aofFile = aofFile

// write select command again to ensure aof file has the same db index with persister.currentDB
data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(persister.currentDB))).ToBytes()
_, err = persister.aofFile.Write(data)
if err != nil {
panic(err)
}