缓存
在 n9e 服务器启动时,会将一些数据同步在内存中进行缓存。系统中的 memsto 模块用于缓存数据。
在 n9e 中,主要缓存以下数据:
SyncBusiGroups()
:业务组信息。SyncUsers()
:每一个用户信息。SyncUserGroups()
:用户组(团队)信息。SyncAlertMutes()
:告警屏蔽 Alert Mute 信息。SyncAlertSubscribes()
:告警订阅 Alert Subscribe 信息。SyncAlertRules()
:告警规则 Alert Rule 信息。SyncTargets()
:Target 信息,用于监控目标主机是否存活(target up 指标)。SyncRecordingRules()
:Recording Rule 信息。
1 | func Sync() { |
缓存的流程都是类似的:
单独开启一个协程用于某部分数据的缓存同步。
协程中,开启一个循环,每 9000 毫秒同步一次数据。
- 在同步数据的过程中,首先在数据库中查询统计信息,即总数和上一次的修改时间。
1
2
3
4type Statistics struct {
Total int64 `gorm:"total"`
LastUpdated int64 `gorm:"last_updated"`
}- 若统计信息没有发生变化,就可以认为数据没有发生改变,所以结束此次同步过程。
- 若统计信息发生变化,则在数据库中查询出数据,将新查询到的数据覆盖到缓存中去。
Alert Rule 缓存
其中 alert_rule_cache 用于缓存 Alert Rule 表中的数据。同步的逻辑主要由 loopSyncAlertRules
函数控制,每 9000 毫秒同步一次数据,使用 syncAlertRules
函数进行同步。
1 | func loopSyncAlertRules() { |
syncAlertRules
在 syncAlertRules
函数中,首先在数据库中根据集群名字查询统计信息。统计信息包括总数、上一次(最近)修改时间。
1 | stat, err := models.AlertRuleStatistics("") |
如果统计信息没有变化,则结束,说明不需要同步。
若发生了变化,则在数据库中查询所有的 AlertRule 信息,并且以告警规则的 ID 为 key,存储到缓存 AlertRuleCache 中。
1 | lst, err := models.AlertRuleGetsByCluster("") |
idents
n9e server 在初始化时,会调用 idents.Handle
初始化 ident 逻辑(ident 就是被监控的各个节点),会启动两个协程:
- loopToRedis 会定期的向 Redis 中写入当前时间戳,用于后续判断 ident 是否存活。
- loopPushMetrics 会定期的判断 ident 是否存活,并向 Prometheus 中推送 target_up 指标。
1 | func Handle(ctx context.Context) { |
ident 是个字典,key 为集群名字,value 为时间戳:
1 | // ident -> timestamp |
loopToRedis
loopToRedis 函数用于定期(四秒)在 Redis 中记录当前时间戳,表明 ident 还存活(这用于 target_up 指标),并且从 Idents 中清理过期的 ident。
1 | func loopToRedis(ctx context.Context) { |
toRedis
toRedis 首先获取所有的 Idents:
- 对于每一个 ident,若过期则清除。
- 若没有过期,则在 Redis 中记录当前时间戳,表明这个 ident 还在存活。
1 | func toRedis() { |
loopPushMetrics
loopPushMetrics 每隔 10秒钟,调用一次 pushMetrics,向Prometheus 中推送 target_up 指标。
pushMetrics
pushMetrics 用于向 Prometheus 中推送 target_up 指标,其流程如下:
首先获取集群名字,判断是否是集群 Leader,若不是直接返回,是 Leader 则进行下一步。
1
2
3
4
5
6
7
8
9
10
11clusterName := config.C.ClusterName
isLeader, err := naming.IamLeader(clusterName)
if err != nil {
logger.Errorf("handle_idents: %v", err)
return
}
if !isLeader {
logger.Info("handle_idents: i am not leader")
return
}判断是否是集群 Leader 的方法就是调用 ActiveServers 方法,从数据库中获取集群中所有的存活节点。
然后根据名字,对存活节点进行排序,排序之后的第一个节点就是 Leader。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16func IamLeader(cluster string) (bool, error) {
servers, err := ActiveServers(cluster)
if err != nil {
logger.Errorf("failed to get active servers: %v", err)
return false, err
}
if len(servers) == 0 {
logger.Errorf("active servers empty")
return false, err
}
sort.Strings(servers)
return config.C.Heartbeat.Endpoint == servers[0], nil
}
接着在 Redis 中获取所有的 target 心跳时间戳(loopToRedis 中记录的,不管存活与否都获取下来)。
1
2
3
4
5
6// get all the target heartbeat timestamp
ret, err := storage.Redis.HGetAll(context.Background(), redisKey(clusterName)).Result()
if err != nil {
logger.Errorf("handle_idents: redis hgetall fail: %v", err)
return
}根据时间戳进行判断,获取所有的存活节点,节点存活的时间间隔在配置文件 NoData 中配置;对于非存活的节点,调用 clearDeadIdent 进行清除(就是在 Redis 中删除对应的一项)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16now := time.Now().Unix()
dur := config.C.NoData.Interval
actives := make(map[string]struct{})
for ident, clockstr := range ret {
clock, err := strconv.ParseInt(clockstr, 10, 64)
if err != nil {
continue
}
if now-clock > dur {
clearDeadIdent(context.Background(), clusterName, ident)
} else {
actives[ident] = struct{}{}
}
}对于每一个存活节点,target_up = 1。最后将 target_up = 1 指标 Push 到 Prometheus 中。
如果在缓存总找到 Target,就把 Target 的 tags 补充到需要 Push 到 Prometheus 的 label 上。
如果没有找到 Target,就在数据库创建中创建 Target。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43// 有心跳,target_up = 1
// 如果找到target,就把target的tags补充到series上
// 如果没有target,就在数据库创建target
for active := range actives {
// build metrics
pt := &prompb.TimeSeries{}
pt.Samples = append(pt.Samples, prompb.Sample{
// use ms
Timestamp: now * 1000,
Value: 1,
})
pt.Labels = append(pt.Labels, &prompb.Label{
Name: model.MetricNameLabel,
Value: config.C.NoData.Metric,
})
pt.Labels = append(pt.Labels, &prompb.Label{
Name: "ident",
Value: active,
})
target, has := memsto.TargetCache.Get(active)
if !has {
// target not exists
target = &models.Target{
Cluster: clusterName,
Ident: active,
Tags: "",
TagsJSON: []string{},
TagsMap: make(map[string]string),
UpdateAt: now,
}
if err := target.Add(); err != nil {
logger.Errorf("handle_idents: insert target(%s) fail: %v", active, err)
}
} else {
common.AppendLabels(pt, target)
}
writer.Writers.PushSample("target_up", pt)
}再把存活的 Target 列表传给缓存,看看除了存活的部分,还有没有其他 Target。有的话返回,这些就是未存活的主机,设置 target_up = 0。最后将 target_up = 0 指标 Push 到 Prometheus 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// 把actives传给TargetCache,看看除了active的部分,还有别的target么?有的话返回,设置target_up = 0
deads := memsto.TargetCache.GetDeads(actives)
for ident, dead := range deads {
if ident == "" {
continue
}
// build metrics
pt := &prompb.TimeSeries{}
pt.Samples = append(pt.Samples, prompb.Sample{
// use ms
Timestamp: now * 1000,
Value: 0,
})
pt.Labels = append(pt.Labels, &prompb.Label{
Name: model.MetricNameLabel,
Value: config.C.NoData.Metric,
})
pt.Labels = append(pt.Labels, &prompb.Label{
Name: "ident",
Value: ident,
})
common.AppendLabels(pt, dead)
writer.Writers.PushSample("target_up", pt)
}