AOF 持久化 数据结构定义 Persister Persister 是 AOF 持久化中的核心数据结构,它从 channel 中接收消息并且将消息写入到 AOF 文件中。部分字段如下:
db:这个 AOF 持久化协程所指向的数据库 。
tmpDBMaker:新建一个临时 的数据库,用于 AOF 重新操作。
aofChan:Persister 就是从这个通道 中监听消息,并且将消息写入到 AOF 文件 中的。
aofFsync:AOF 文件刷入到磁盘的策略 ,有三种选项可以选择:FsyncAlways (每一个命令都会进行刷盘操作),FsyncEverySec (每秒钟进行一次刷盘操作),FsyncNo (不主动进行刷盘操作,交给操作系统去决定)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 type Listener interface { Callback([]CmdLine) } type Persister struct { ctx context.Context cancel context.CancelFunc db database.DBEngine tmpDBMaker func () database .DBEngine aofChan chan *payload aofFile *os.File aofFilename string aofFsync string aofFinished chan struct {} pausingAof sync.Mutex currentDB int listeners map [Listener]struct {} buffer []CmdLine }
payload payload 为 aofChan 通道内的消息 ,包括:命令行参数(cmdLine)、数据库(dbIndex)、用于同步的信号量(wg)。
1 2 3 4 5 6 7 8 type CmdLine = [][]byte type payload struct { cmdLine CmdLine dbIndex int wg *sync.WaitGroup }
NewPersister 新建一个 AOF 持久化工作协程 数据库可以利用 NewPersister 方法新建一个 AOF 持久化进程进行 AOF 持久化。
1 2 func NewPersister (db database.DBEngine, filename string , load bool , fsync string , tmpDBMaker func () database .DBEngine ) (*Persister, error)
其流程如下:
新建一个 Persister ,填写 AOF 文件名、刷盘策略、数据库等信息。
1 2 3 4 5 6 persister := &Persister{} persister.aofFilename = filename persister.aofFsync = strings.ToLower(fsync) persister.db = db persister.tmpDBMaker = tmpDBMaker persister.currentDB = 0
如果需要加载 AOF 文件 中已有的数据,则执行 persister.LoadAof 方法加载命令。
1 2 3 if load { persister.LoadAof(0 ) }
打开 AOF 持久化文件 ,初始化监听命令的通道等。
1 2 3 4 5 6 7 8 aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600 ) if err != nil { return nil , err } persister.aofFile = aofFile persister.aofChan = make (chan *payload, aofQueueSize) persister.aofFinished = make (chan struct {}) persister.listeners = make (map [Listener]struct {})
开启一个协程 ,用于监听 aofChan,将命令写入到 AOF 文件中 。
1 2 3 go func () { persister.listenCmd() }()
如果刷盘策略为 FsyncEverySec ,则开启一个协程用于每秒钟刷盘操作 。
1 2 3 4 5 6 7 ctx, cancel := context.WithCancel(context.Background()) persister.ctx = ctx persister.cancel = cancel if persister.aofFsync == FsyncEverySec { persister.fsyncEverySecond() } return persister, nil
fsyncEverySecond 每秒钟的刷盘操作 persister.fsyncEverySecond 方法用于每秒钟的刷盘操作,它每过一秒钟执行一次 persister.aofFile.Sync,将内存中的数据刷入到磁盘中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (persister *Persister) fsyncEverySecond () { ticker := time.NewTicker(time.Second) go func () { for { select { case <-ticker.C: persister.pausingAof.Lock() if err := persister.aofFile.Sync(); err != nil { logger.Errorf("fsync failed: %v" , err) } persister.pausingAof.Unlock() case <-persister.ctx.Done(): return } } }() }
LoadAof 加载 AOF 文件中的数据 persister.LoadAof 方法用于读取 AOF 文件,这个方法在监听 aofChan 之前使用。
1 2 func (persister *Persister) LoadAof (maxBytes int )
其流程如下:
首先将 aofChan 设置为 nil ,因为 persister.db.Exec 在执行 AOF 文件中的命令时,可能又会向 aofChan 中加入命令 。这些命令是不需要加入到 aofChan 中的(加入 aofChan 中数据会出错,因为这算是又在 AOF 文件中记录了一次),从 AOF 文件中读取并执行即可。
1 2 3 4 5 6 7 aofChan := persister.aofChan persister.aofChan = nil defer func (aofChan chan *payload) { persister.aofChan = aofChan }(aofChan)
打开 AOF 文件 ,从 AOF 文件中读取 maxBytes 字节的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 file, err := os.Open(persister.aofFilename) if err != nil { if _, ok := err.(*os.PathError); ok { return } logger.Warn(err) return } defer file.Close()var reader io.Readerif maxBytes > 0 { reader = io.LimitReader(file, int64 (maxBytes)) } else { reader = file }
读取 AOF 文件复用了协议解析器 ,fakeConn 仅仅用于持久化操作中(它表示一个虚拟的客户端连接 ,仅仅用于执行 AOF 文件中的命令)。
1 2 ch := parser.ParseStream(reader) fakeConn := connection.NewFakeConn()
从解析解析器返回的通道中,依次读取命令 。调用 persister.db.Exec 方法在数据库中执行命令 ,需要注意的是如果遇到 SELECT 命令则设置 persister 当前数据库。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 for p := range ch { if p.Err != nil { if p.Err == io.EOF { break } logger.Error("parse error: " + p.Err.Error()) continue } if p.Data == nil { logger.Error("empty payload" ) continue } r, ok := p.Data.(*protocol.MultiBulkReply) if !ok { logger.Error("require multi bulk protocol" ) continue } ret := persister.db.Exec(fakeConn, r.Args) if protocol.IsErrorReply(ret) { logger.Error("exec err" , string (ret.ToBytes())) } if strings.ToLower(string (r.Args[0 ])) == "select" { dbIndex, err := strconv.Atoi(string (r.Args[1 ])) if err == nil { persister.currentDB = dbIndex } } }
listenCmd 监听命令,写入 AOF 文件 persister.listenCmd 用于监听 aofChan 通道,将通道中的命令写入到 AOF 文件中 。每次接收到命令时,都会调用 persister.writeAof 写入 AOF 文件。
1 2 3 4 5 6 7 func (persister *Persister) listenCmd () { for p := range persister.aofChan { persister.writeAof(p) } persister.aofFinished <- struct {}{} }
writeAof 将一条命令写入 AOF 文件中 persister.writeAof 方法用于将一条命令写入到 AOF 文件中。
1 func (persister *Persister) writeAof (p *payload)
其流程如下:
首先,选择正确的数据库 。每个客户端都可以选择自己的数据库,所以 payload 中要保存客户端选择的数据库。选择的数据库与 AOF 文件中当前的数据库不一致时写入一条 Select 命令 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 persister.buffer = persister.buffer[:0 ] persister.pausingAof.Lock() defer persister.pausingAof.Unlock()if p.dbIndex != persister.currentDB { selectCmd := utils.ToCmdLine("SELECT" , strconv.Itoa(p.dbIndex)) persister.buffer = append (persister.buffer, selectCmd) data := protocol.MakeMultiBulkReply(selectCmd).ToBytes() _, err := persister.aofFile.Write(data) if err != nil { logger.Warn(err) return } persister.currentDB = p.dbIndex }
1 2 3 4 5 6 7 data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes() persister.buffer = append (persister.buffer, p.cmdLine) _, err := persister.aofFile.Write(data) if err != nil { logger.Warn(err) }
接着调用 listener.CallBack,如果刷盘策略为 FsyncAlways(每一条命令都刷盘),则调用 persister.aofFile.Sync 刷盘。
1 2 3 4 5 6 for listener := range persister.listeners { listener.Callback(persister.buffer) } if persister.aofFsync == FsyncAlways { _ = persister.aofFile.Sync() }
SaveCmdLine 向 aofChan 通道中发送命令 persister.SaveCmdLine 方法用于向 aofChan 通道中发送一条命令。其流程如下:
如果 aofChan 为空,则说明在 LoadAof 过程中,直接返回即可。
如果刷盘策略为 FsyncAlways,则直接调用 persister.writeAof 方法将命令写入 AOF 文件中。
否则,就将命令和执行这条命令的数据库发送到 aofChan 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (persister *Persister) SaveCmdLine (dbIndex int , cmdLine CmdLine) { if persister.aofChan == nil { return } if persister.aofFsync == FsyncAlways { p := &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } persister.writeAof(p) return } persister.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } }
Close 优雅的关闭 AOF 持久化 persister.Close 方法用于优雅的关闭 AOF 持久化,其流程如下:
首先关闭 aofChan 通道 ,接着等待 AOF 持久化完成 (persister.listenCmd 方法结束),关闭 AOF 文件句柄 。
调用 persister 中上下文 cancel 方法,用于结束 persister.fsyncEverySecond 方法。
1 2 3 4 5 6 7 8 9 10 11 12 func (persister *Persister) Close () { if persister.aofFile != nil { close (persister.aofChan) <-persister.aofFinished err := persister.aofFile.Close() if err != nil { logger.Warn(err) } } persister.cancel() }
AOF 重写 AOF 重写可以减小持久化文件的大小,以删除无用的指令。
重写必须在固定不变的数据集上进行 ,不能直接使用内存中的数据。在 godis 中,采用读取 AOF 文件生成副本 的方式进行重写操作。
流程如下:
重写开始: 暂停 AOF 写入 -> 准备重写 -> 恢复AOF写入。
执行重写: 重写协程读取 AOF 文件中的前一部分 (重写开始前的数据,不包括读写过程中写入的数据)并重写到临时文件中。
重写结束: 暂停 AOF 写入 -> 将重写过程中产生的新数据写入临时文件 中 -> 使用临时文件覆盖 AOF 文件(使用文件系统的 mv 命令保证安全) -> 恢复 AOF 写入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 type RewriteCtx struct { tmpFile *os.File fileSize int64 dbIdx int } func (persister *Persister) Rewrite () error { ctx, err := persister.StartRewrite() if err != nil { return err } err = persister.DoRewrite(ctx) if err != nil { return err } persister.FinishRewrite(ctx) return nil }
StartRewrite 重写开始 persister.StartRewrite 用于重写开始的操作。
1 2 func (persister *Persister) StartRewrite () (*RewriteCtx, error)
流程如下:
首先,暂停 AOF 写入 ,数据会在 aofChan 中堆积(在重写开始结束后恢复 AOF 写入)。
1 2 persister.pausingAof.Lock() defer persister.pausingAof.Unlock()
调用 fsync 将缓冲区数据落盘 ,防止 AOF 文件不完整造成错误。
1 2 3 4 5 err := persister.aofFile.Sync() if err != nil { logger.Warn("fsync failed" ) return nil , err }
获得当前 AOF 文件大小 ,用于判断哪些数据是 AOF 重写执行过程中产生的。
1 2 3 fileInfo, _ := os.Stat(persister.aofFilename) filesize := fileInfo.Size()
1 2 3 4 5 6 7 8 9 10 11 file, err := ioutil.TempFile("" , "*.aof" ) if err != nil { logger.Warn("tmp file create failed" ) return nil , err } return &RewriteCtx{ tmpFile: file, fileSize: filesize, dbIdx: persister.currentDB, }, nil
DoRewrite 执行重写 persister.DoRewrite 方法用于真正的重写 AOF 文件过程。
1 func (persister *Persister) DoRewrite (ctx *RewriteCtx) error
其流程如下:
读取 AOF 文件在重写开始时获取到的文件大小长度的数据,这些数据是重写开始前的数据,将重写的数据加载进入内存 。
1 2 3 4 5 tmpFile := ctx.tmpFile tmpAof := persister.newRewriteHandler() tmpAof.LoadAof(int (ctx.fileSize))
依次将每一个数据库中的数据,重写进入临时的 AOF 文件 中。
对于每一个数据库,首先在临时文件中写入 Select 命令选择正确的数据库 。
接着调用 tmpAof.db.ForEach 函数 ,遍历 数据库中的每一个 key,将每一个键值对写入到临时文件中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 for i := 0 ; i < config.Properties.Databases; i++ { data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT" , strconv.Itoa(i))).ToBytes() _, err := tmpFile.Write(data) if err != nil { return err } tmpAof.db.ForEach(i, func (key string , entity *database.DataEntity, expiration *time.Time) bool { cmd := EntityToCmd(key, entity) if cmd != nil { _, _ = tmpFile.Write(cmd.ToBytes()) } if expiration != nil { cmd := MakeExpireCmd(key, *expiration) if cmd != nil { _, _ = tmpFile.Write(cmd.ToBytes()) } } return true }) } return nil
在第一步中的 newRewriteHandler 方法:在这个方法中,就调用了 persister.tmpDBMaker 建立一个临时的数据库,加载待重写的 AOF 文件,用于重写操作 。
1 2 3 4 5 6 func (persister *Persister) newRewriteHandler () *Persister { h := &Persister{} h.aofFilename = persister.aofFilename h.db = persister.tmpDBMaker() return h }
FinishRewrite 结束重写 persister.FinishRewrite 方法用于结束重写,这个过程最为复杂。
1 2 func (persister *Persister) FinishRewrite (ctx *RewriteCtx)
其流程如下:
1 2 persister.pausingAof.Lock() defer persister.pausingAof.Unlock()
打开 AOF 文件 ,并 seek 到重写开始的位置 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 tmpFile := ctx.tmpFile src, err := os.Open(persister.aofFilename) if err != nil { logger.Error("open aofFilename failed: " + err.Error()) return } defer func () { _ = src.Close() }() _, err = src.Seek(ctx.fileSize, 0 ) if err != nil { logger.Error("seek failed: " + err.Error()) return }
在临时文件中写入一条 Select 命令 ,使得临时文件切换到重写开始时选中的数据库。
1 2 3 4 5 6 7 data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT" , strconv.Itoa(ctx.dbIdx))).ToBytes() _, err = tmpFile.Write(data) if err != nil { logger.Error("tmp file rewrite failed: " + err.Error()) return }
对齐数据库后,就可以把重写过程中产生的数据复制到临时文件中了 。
1 2 3 4 5 6 _, err = io.Copy(tmpFile, src) if err != nil { logger.Error("copy aof filed failed: " + err.Error()) return }
1 2 3 _ = persister.aofFile.Close() _ = os.Rename(tmpFile.Name(), persister.aofFilename)
重新打开 AOF 文件 ,并重新写入一次 Select 命令 保证 AOF 文件中的数据库与 persister.currentDB 一致。
1 2 3 4 5 6 7 8 9 10 11 12 13 aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600 ) if err != nil { panic (err) } persister.aofFile = aofFile data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT" , strconv.Itoa(persister.currentDB))).ToBytes() _, err = persister.aofFile.Write(data) if err != nil { panic (err) }