Dawn's Blogs

分享技术 记录成长

0%

godis源码阅读 (1) TCP服务器

Godis 介绍

Godis 是一个用 Go 语言实现的 Redis 服务器。

目录结构

Godis 项目的目录结构:

  • 根目录: main 函数,执行入口

  • config: 配置文件解析

  • interface: 一些模块间的接口定义

  • lib: 各种工具,比如logger、同步和通配符

  • tcp: tcp 服务器实现

  • redis: redis 协议解析器

  • datastruct: redis 的各类数据结构实现

    • dict: hash 表
    • list: 链表
    • lock: 用于锁定 key 的锁组件
    • set: 基于hash表的集合
    • sortedset: 基于跳表实现的有序集合
  • database: 存储引擎核心

    • server.go: redis 服务实例, 支持多数据库, 持久化, 主从复制等能力
    • database.go: 单个 database 的数据结构和功能
    • router.go: 将命令路由给响应的处理函数
    • keys.go: del、ttl、expire 等通用命令实现
    • string.go: get、set 等字符串命令实现
    • list.go: lpush、lindex 等列表命令实现
    • hash.go: hget、hset 等哈希表命令实现
    • set.go: sadd 等集合命令实现
    • sortedset.go: zadd 等有序集合命令实现
    • pubsub.go: 发布订阅命令实现
    • geo.go: GEO 相关命令实现
    • sys.go: Auth 等系统功能实现
    • transaction.go: 单机事务实现
  • cluster: 集群

    • cluster.go: 集群入口
    • com.go: 节点间通信
    • del.go: delete 命令原子性实现
    • keys.go: key 相关命令集群中实现
    • mset.go: mset 命令原子性实现
    • multi.go: 集群内事务实现
    • pubsub.go: 发布订阅实现
    • rename.go: rename 命令集群实现
    • tcc.go: tcc 分布式事务底层实现
  • aof: AOF 持久化实现

main 函数

在阅读 TCP 服务器之前,首先来看看 main 函数:

  • 首先,调用 logger.Setup 注册初始化日志模块
  • 接着,调用 config.SetupConfig 加载配置文件
  • 最后,调用 tcp.ListenAndServeWithSignal 开启 TCP 服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
print(banner)
logger.Setup(&logger.Settings{
Path: "logs",
Name: "godis",
Ext: "log",
TimeFormat: "2006-01-02",
})
configFilename := os.Getenv("CONFIG")
if configFilename == "" {
if fileExists("redis.conf") {
config.SetupConfig("redis.conf")
} else {
config.Properties = defaultProperties
}
} else {
config.SetupConfig(configFilename)
}

err := tcp.ListenAndServeWithSignal(&tcp.Config{
Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port),
}, RedisServer.MakeHandler())
if err != nil {
logger.Error(err)
}
}

TCP 服务器

在 main 函数中,调用 ListenAndServeWithSignal 函数开启了一个 TCP 服务器。ListenAndServeWithSignal 的函数头如下:

1
func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error

其中,Config 为 TCP 服务器的配置,包括:绑定地址、最大连接数、超时时间。

1
2
3
4
5
6
// Config stores tcp server properties
type Config struct {
Address string `yaml:"address"`
MaxConnect uint32 `yaml:"max-connect"`
Timeout time.Duration `yaml:"timeout"`
}

tcp.Handler 为 TCP 服务器上层应用接口,定义了两个函数:

  • Handle:为某一个 TCP 连接进行应用层服务。
  • Close:关闭上层应用服务。
1
2
3
4
5
6
7
8
// HandleFunc represents application handler function
type HandleFunc func(ctx context.Context, conn net.Conn)

// Handler represents application server over tcp
type Handler interface {
Handle(ctx context.Context, conn net.Conn)
Close() error
}

ListenAndServeWithSignal

ListenAndServeWithSignal 用于绑定端口,开启一个 TCP 服务器,它的流程如下:

  • 首先开启两个通道,一个用于接收系统信号(系统信号用于关闭),一个用于传递关闭信号
1
2
3
closeChan := make(chan struct{})
sigCh := make(chan os.Signal)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  • 开启一个协程用于接收系统信号,当接收到关闭的系统信号时,向关闭管道中传入数据,表明要关闭服务器。
1
2
3
4
5
6
7
go func() {
sig := <-sigCh
switch sig {
case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
closeChan <- struct{}{}
}
}()
  • 开始监听 TCP 端口。
1
2
3
4
listener, err := net.Listen("tcp", cfg.Address)
if err != nil {
return err
}
  • 调用 ListenAndServe 开启真正的应用层服务
1
2
3
logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
ListenAndServe(listener, handler, closeChan)
return nil

ListenAndServe

ListenAndServe 用于接收 TCP 连接,为每一个 TCP 连接开启应用层服务,其函数头如下:

1
func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{})

ListenAndServe 的流程如下:

  • 开启一个线程监听关闭信号,当接收到关闭信号时,关闭TCP 端口监听(listener.Accept() 会立即返回 io.EOF)、关闭所有连接停止应用层服务。
1
2
3
4
5
6
7
// listen signal
go func() {
<-closeChan
logger.Info("shutting down...")
_ = listener.Close() // listener.Accept() will return err immediately
_ = handler.Close() // close connections
}()
  • 为了防止意想不到的的错误发生,defer 中同样关闭TCP 端口监听、关闭 handler。
  • 开启一个循环接收 TCP 连接,对于每一个 TCP 连接都开启一个协程,调用 handler.Handle 为其进行应用层服务。开启每一个协程时,都需要 waitDone.Add(1),目的是等待每一个 TCP 连接完成后再执行 defer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ctx := context.Background()
var waitDone sync.WaitGroup
for {
conn, err := listener.Accept()
if err != nil {
break
}
// handle
logger.Info("accept link")
waitDone.Add(1)
go func() {
defer func() {
waitDone.Done()
}()
handler.Handle(ctx, conn)
}()
}
waitDone.Wait()

在生产环境下需要保证TCP服务器关闭前完成必要的清理工作,包括将完成正在进行的数据传输,关闭TCP连接等。这种关闭模式称为优雅关闭,可以避免资源泄露以及客户端未收到完整数据导致故障。

TCP 服务器的优雅关闭模式通常为:先关闭 listener 阻止新连接进入,然后遍历所有连接逐个进行关闭