Dawn's Blogs

分享技术 记录成长

0%

夜莺监控系统学习笔记 (2) server初始化和心跳机制

运行 n9e server

Run

首先调用 Run 函数启动一个 n9e server,其流程如下。

  • 首先初始化一个 channel,监听系统信号
1
2
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  • 调用 initialize 函数对 server 进行初始化
1
cleanFunc, err := server.initialize()
  • 接着等待系统信号:
    • 若检测到 syscall.SIGHUP 信号,则调用 reload() 重新加载模板文件。
    • 检测到其他信号就退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
EXIT:
for {
sig := <-sc
fmt.Println("received signal:", sig.String())
switch sig {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
code = 0
break EXIT
case syscall.SIGHUP:
// reload configuration?
reload()
default:
break EXIT
}
}

cleanFunc()
fmt.Println("server exited")
os.Exit(code)

reload 重新加载模板文件

reload 函数中,调用了 engine.Reload() 重新加载模板文件:

1
2
3
4
5
func reload() {
logger.Info("start reload configs")
engine.Reload()
logger.Info("reload configs finished")
}

engine.Reload() 函数如下,其中 reloadTpls() 用于重新加载模板文件

1
2
3
4
5
6
func Reload() {
err := reloadTpls()
if err != nil {
logger.Error("engine reload err:", err)
}
}

初始化 initialize

首先解析 Config 文件、初始化 i18n(i18n是一个代码国际化组件)、logger:

1
2
3
4
5
6
7
8
9
10
11
12
13
// parse config file
config.MustLoad(s.ConfigFile)

// init i18n
i18n.Init()

// init logger
loggerClean, err := logx.Init(config.C.Log)
if err != nil {
return fns.Ret(), err
} else {
fns.Add(loggerClean)
}

初始化数据库、redis:

1
2
3
4
5
6
7
8
9
10
11
12
// init database
if err = storage.InitDB(config.C.DB); err != nil {
return fns.Ret(), err
}

// init redis
redisClean, err := storage.InitRedis(config.C.Redis)
if err != nil {
return fns.Ret(), err
} else {
fns.Add(redisClean)
}

初始化 prometheus remote writers、prometheus remote reader、memsto 缓存:

1
2
3
4
5
6
7
8
9
10
11
12
// init prometheus remote writers
if err = writer.Init(config.C.Writers, config.C.WriterOpt); err != nil {
return fns.Ret(), err
}

// init prometheus remote reader
if err = config.InitReader(); err != nil {
return fns.Ret(), err
}

// sync rules/users/mutes/targets to memory cache
memsto.Sync()

初始化 hertbeat 心跳机制,在 redis 中记录心跳,定时清楚没有心跳的主机(可以检测主机的存活)。如果主机有变化,则重新构造一致性哈希环。

1
2
3
4
// start heartbeat
if err = naming.Heartbeat(ctx); err != nil {
return fns.Ret(), err
}

启动 judge 引擎,judge 用于消费告警信息,并且通知告警信息。

1
2
3
4
// start judge engine
if err = engine.Start(ctx); err != nil {
return fns.Ret(), err
}

Server 心跳机制

server 中会向 Redis 中(在后续版本中,心跳机制已经改为向 DB 中发送心跳)持续发送心跳,以表明自己存活。

1
2
3
4
5
6
7
8
9
func loopHeartbeat(ctx context.Context) {
interval := time.Duration(config.C.Heartbeat.Interval) * time.Millisecond
for {
time.Sleep(interval)
if err := heartbeat(ctx); err != nil {
logger.Warning(err)
}
}
}

heartbeat

heartbeat 函数用于检测存活 n9e server,若发生变更则重新构建一致性哈希环,其逻辑如下。

首先,在 Redis 中写入当前时间,表明已经发送了心跳信息。

1
2
3
now := time.Now().Unix()
key := redisKey(config.C.ClusterName)
err := storage.Redis.HSet(ctx, key, config.C.Heartbeat.Endpoint, now).Err()

然后,利用 ActiveServers 获取当前存活的主机。

1
servers, err := ActiveServers(ctx, config.C.ClusterName)

对所有的主机进行排序,若和之前的主机列表不一致则重新构建一致性哈希环。

1
2
3
4
5
6
sort.Strings(servers)
newss := strings.Join(servers, " ")
if newss != localss {
RebuildConsistentHashRing(servers)
localss = newss // localss is "lacal server"
}

新版 heartbeat

在 DB 中发送心跳,一个好处就是可以聚合跨集群的 n9e server,跨集群可以统一处理。

在新版 heartbeat 中,心跳机制已经改为向 DB 中发送心跳

  • 首先检查配置文件中,如何维护实例和集群的对应关系

    • 若在配置文件维护实例和集群(这个集群指的是 n9e server 集群)的对应关系,则直接在 DB 中根据实例(config.C.Heartbeat.Endpoint)和集群(config.C.Readers[i].ClusterName)发送心跳。向数据库中发送心跳,实际上就是找到对应的一行数据,更新 clock 字段。数据库对应的表结构如下:
    1
    2
    3
    4
    5
    6
    type AlertingEngines struct {
    Id int64 `json:"id" gorm:"primaryKey"`
    Instance string `json:"instance"`
    Cluster string `json:"cluster"` // reader cluster
    Clock int64 `json:"clock"`
    }
    • 若在页面上维护实例和集群的对应关系:
      • 则首先在数据库中查询,根据当前节点的实例名(config.C.Heartbeat.Endpoint),得到对应的集群
      • 在数据库中查询到对应的集群为空,则说明实例刚刚部署,还没有在页面配置 cluster 的情况,先使用配置文件中的 cluster 上报心跳
      • 在数据中上报当前实例节点的心跳 err := models.AlertingEngineHeartbeat(config.C.Heartbeat.Endpoint)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
if config.C.ReaderFrom == "config" {
// 在配置文件维护实例和集群的对应关系
for i := 0; i < len(config.C.Readers); i++ {
clusters = append(clusters, config.C.Readers[i].ClusterName)
err := models.AlertingEngineHeartbeatWithCluster(config.C.Heartbeat.Endpoint, config.C.Readers[i].ClusterName)
if err != nil {
logger.Warningf("heartbeat with cluster %s err:%v", config.C.Readers[i].ClusterName, err)
continue
}
}
} else {
// 在页面上维护实例和集群的对应关系
clusters, err = models.AlertingEngineGetClusters(config.C.Heartbeat.Endpoint)
if err != nil {
return err
}
if len(clusters) == 0 {
// 实例刚刚部署,还没有在页面配置 cluster 的情况,先使用配置文件中的 cluster 上报心跳
for i := 0; i < len(config.C.Readers); i++ {
err := models.AlertingEngineHeartbeatWithCluster(config.C.Heartbeat.Endpoint, config.C.Readers[i].ClusterName)
if err != nil {
logger.Warningf("heartbeat with cluster %s err:%v", config.C.Readers[i].ClusterName, err)
continue
}
}
}

err := models.AlertingEngineHeartbeat(config.C.Heartbeat.Endpoint)
if err != nil {
return err
}
}
  • 接着,检查所有的集群,利用 ActiveServers 获取集群中存活的主机
1
servers, err := ActiveServers(clusters[i])
  • 最后,对所有的主机进行排序,若和之前的主机列表不一致重新构建一致性哈希环
1
2
3
4
5
6
sort.Strings(servers)
newss := strings.Join(servers, " ")
if newss != localss {
RebuildConsistentHashRing(servers)
localss = newss // localss is "lacal server"
}

ActiveServers 检查集群中存活的主机

ActiveServers 函数主要在数据库中查询后返回存活的主机列表,30秒内有心跳,就认为是活的。

1
2
3
4
5
6
7
8
func ActiveServers(cluster string) ([]string, error) {
if cluster == "" {
return nil, fmt.Errorf("cluster is empty")
}

// 30秒内有心跳,就认为是活的
return models.AlertingEngineGetsInstances("cluster = ? and clock > ?", cluster, time.Now().Unix()-30)
}