运行 n9e server Run 首先调用 Run
函数启动一个 n9e server ,其流程如下。
1 2 sc := make (chan os.Signal, 1 ) signal.Notify(sc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
调用 initialize
函数对 server 进行初始化 :
1 cleanFunc, err := server.initialize()
接着等待系统信号:
若检测到 syscall.SIGHUP
信号,则调用 reload()
重新加载模板 文件。
检测到其他信号就退出 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 EXIT: for { sig := <-sc fmt.Println("received signal:" , sig.String()) switch sig { case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: code = 0 break EXIT case syscall.SIGHUP: reload() default : break EXIT } } cleanFunc() fmt.Println("server exited" ) os.Exit(code)
reload 重新加载模板文件 在 reload
函数中,调用了 engine.Reload()
重新加载模板文件:
1 2 3 4 5 func reload () { logger.Info("start reload configs" ) engine.Reload() logger.Info("reload configs finished" ) }
engine.Reload()
函数如下,其中 reloadTpls()
用于重新加载模板文件 :
1 2 3 4 5 6 func Reload () { err := reloadTpls() if err != nil { logger.Error("engine reload err:" , err) } }
初始化 initialize 首先解析 Config 文件、初始化 i18n(i18n是一个代码国际化组件)、logger:
1 2 3 4 5 6 7 8 9 10 11 12 13 config.MustLoad(s.ConfigFile) i18n.Init() loggerClean, err := logx.Init(config.C.Log) if err != nil { return fns.Ret(), err } else { fns.Add(loggerClean) }
初始化数据库、redis:
1 2 3 4 5 6 7 8 9 10 11 12 if err = storage.InitDB(config.C.DB); err != nil { return fns.Ret(), err } redisClean, err := storage.InitRedis(config.C.Redis) if err != nil { return fns.Ret(), err } else { fns.Add(redisClean) }
初始化 prometheus remote writers、prometheus remote reader、memsto 缓存:
1 2 3 4 5 6 7 8 9 10 11 12 if err = writer.Init(config.C.Writers, config.C.WriterOpt); err != nil { return fns.Ret(), err } if err = config.InitReader(); err != nil { return fns.Ret(), err } memsto.Sync()
初始化 hertbeat 心跳机制,在 redis 中记录心跳,定时清楚没有心跳的主机(可以检测主机的存活)。如果主机有变化,则重新构造一致性哈希环。
1 2 3 4 if err = naming.Heartbeat(ctx); err != nil { return fns.Ret(), err }
启动 judge 引擎,judge 用于消费告警信息,并且通知告警信息。
1 2 3 4 if err = engine.Start(ctx); err != nil { return fns.Ret(), err }
Server 心跳机制 server 中会向 Redis 中(在后续版本中,心跳机制已经改为向 DB 中发送心跳)持续发送心跳,以表明自己存活。
1 2 3 4 5 6 7 8 9 func loopHeartbeat (ctx context.Context) { interval := time.Duration(config.C.Heartbeat.Interval) * time.Millisecond for { time.Sleep(interval) if err := heartbeat(ctx); err != nil { logger.Warning(err) } } }
heartbeat heartbeat
函数用于检测存活 n9e server,若发生变更则重新构建一致性哈希环 ,其逻辑如下。
首先,在 Redis 中写入当前时间,表明已经发送了心跳信息。
1 2 3 now := time.Now().Unix() key := redisKey(config.C.ClusterName) err := storage.Redis.HSet(ctx, key, config.C.Heartbeat.Endpoint, now).Err()
然后,利用 ActiveServers
获取当前存活的主机。
1 servers, err := ActiveServers(ctx, config.C.ClusterName)
对所有的主机进行排序,若和之前的主机列表不一致则重新构建一致性哈希环。
1 2 3 4 5 6 sort.Strings(servers) newss := strings.Join(servers, " " ) if newss != localss { RebuildConsistentHashRing(servers) localss = newss }
新版 heartbeat
在 DB 中发送心跳,一个好处就是可以聚合跨集群的 n9e server,跨集群可以统一处理。
在新版 heartbeat 中,心跳机制已经改为向 DB 中发送心跳 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 if config.C.ReaderFrom == "config" { for i := 0 ; i < len (config.C.Readers); i++ { clusters = append (clusters, config.C.Readers[i].ClusterName) err := models.AlertingEngineHeartbeatWithCluster(config.C.Heartbeat.Endpoint, config.C.Readers[i].ClusterName) if err != nil { logger.Warningf("heartbeat with cluster %s err:%v" , config.C.Readers[i].ClusterName, err) continue } } } else { clusters, err = models.AlertingEngineGetClusters(config.C.Heartbeat.Endpoint) if err != nil { return err } if len (clusters) == 0 { for i := 0 ; i < len (config.C.Readers); i++ { err := models.AlertingEngineHeartbeatWithCluster(config.C.Heartbeat.Endpoint, config.C.Readers[i].ClusterName) if err != nil { logger.Warningf("heartbeat with cluster %s err:%v" , config.C.Readers[i].ClusterName, err) continue } } } err := models.AlertingEngineHeartbeat(config.C.Heartbeat.Endpoint) if err != nil { return err } }
接着,检查所有的集群,利用 ActiveServers
获取集群中存活的主机 。
1 servers, err := ActiveServers(clusters[i])
最后,对所有的主机进行排序 ,若和之前的主机列表不一致 则重新构建一致性哈希环 。
1 2 3 4 5 6 sort.Strings(servers) newss := strings.Join(servers, " " ) if newss != localss { RebuildConsistentHashRing(servers) localss = newss }
ActiveServers 检查集群中存活的主机 ActiveServers
函数主要在数据库中查询 后返回存活的主机列表,30秒内有心跳,就认为是活的。
1 2 3 4 5 6 7 8 func ActiveServers (cluster string ) ([]string , error) { if cluster == "" { return nil , fmt.Errorf("cluster is empty" ) } return models.AlertingEngineGetsInstances("cluster = ? and clock > ?" , cluster, time.Now().Unix()-30 ) }