Dawn's Blogs

分享技术 记录成长

0%

夜莺监控系统学习笔记 (4) 缓存和idents

缓存

在 n9e 服务器启动时,会将一些数据同步在内存中进行缓存。系统中的 memsto 模块用于缓存数据。

在 n9e 中,主要缓存以下数据:

  • SyncBusiGroups():业务组信息。
  • SyncUsers():每一个用户信息。
  • SyncUserGroups():用户组(团队)信息。
  • SyncAlertMutes():告警屏蔽 Alert Mute 信息。
  • SyncAlertSubscribes():告警订阅 Alert Subscribe 信息。
  • SyncAlertRules():告警规则 Alert Rule 信息。
  • SyncTargets():Target 信息,用于监控目标主机是否存活(target up 指标)。
  • SyncRecordingRules():Recording Rule 信息。
1
2
3
4
5
6
7
8
9
10
func Sync() {
SyncBusiGroups()
SyncUsers()
SyncUserGroups()
SyncAlertMutes()
SyncAlertSubscribes()
SyncAlertRules()
SyncTargets()
SyncRecordingRules()
}

缓存的流程都是类似的:

  • 单独开启一个协程用于某部分数据的缓存同步。

  • 协程中,开启一个循环,每 9000 毫秒同步一次数据

    • 在同步数据的过程中,首先在数据库中查询统计信息,即总数和上一次的修改时间
    1
    2
    3
    4
    type Statistics struct {
    Total int64 `gorm:"total"`
    LastUpdated int64 `gorm:"last_updated"`
    }
    • 若统计信息没有发生变化,就可以认为数据没有发生改变,所以结束此次同步过程。
    • 若统计信息发生变化,则在数据库中查询出数据,将新查询到的数据覆盖到缓存中去。

Alert Rule 缓存

其中 alert_rule_cache 用于缓存 Alert Rule 表中的数据。同步的逻辑主要由 loopSyncAlertRules 函数控制,每 9000 毫秒同步一次数据,使用 syncAlertRules 函数进行同步。

1
2
3
4
5
6
7
8
9
func loopSyncAlertRules() {
duration := time.Duration(9000) * time.Millisecond
for {
time.Sleep(duration)
if err := syncAlertRules(); err != nil {
logger.Warning("failed to sync alert rules:", err)
}
}
}

syncAlertRules

syncAlertRules 函数中,首先在数据库中根据集群名字查询统计信息。统计信息包括总数上一次(最近)修改时间

1
2
3
4
stat, err := models.AlertRuleStatistics("")
if err != nil {
return errors.WithMessage(err, "failed to exec AlertRuleStatistics")
}

如果统计信息没有变化,则结束,说明不需要同步。

若发生了变化,则在数据库中查询所有的 AlertRule 信息,并且以告警规则的 ID 为 key,存储到缓存 AlertRuleCache 中。

1
2
3
4
5
6
7
8
9
10
11
lst, err := models.AlertRuleGetsByCluster("")
if err != nil {
return errors.WithMessage(err, "failed to exec AlertRuleGetsByCluster")
}

m := make(map[int64]*models.AlertRule)
for i := 0; i < len(lst); i++ {
m[lst[i].Id] = lst[i]
}

AlertRuleCache.Set(m, stat.Total, stat.LastUpdated)

idents

n9e server 在初始化时,会调用 idents.Handle 初始化 ident 逻辑(ident 就是被监控的各个节点),会启动两个协程:

  • loopToRedis 会定期的向 Redis 中写入当前时间戳,用于后续判断 ident 是否存活。
  • loopPushMetrics 会定期的判断 ident 是否存活,并向 Prometheus 中推送 target_up 指标
1
2
3
4
func Handle(ctx context.Context) {
go loopToRedis(ctx)
go loopPushMetrics(ctx)
}

ident 是个字典,key 为集群名字,value 为时间戳:

1
2
// ident -> timestamp
var Idents = cmap.New()

loopToRedis

loopToRedis 函数用于定期(四秒)在 Redis 中记录当前时间戳,表明 ident 还存活(这用于 target_up 指标),并且从 Idents 中清理过期的 ident

1
2
3
4
5
6
7
8
9
10
11
func loopToRedis(ctx context.Context) {
duration := time.Duration(4) * time.Second
for {
select {
case <-ctx.Done():
return
case <-time.After(duration):
toRedis()
}
}
}

toRedis

toRedis 首先获取所有的 Idents:

  • 对于每一个 ident,若过期则清除。
  • 若没有过期,则在 Redis 中记录当前时间戳,表明这个 ident 还在存活。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func toRedis() {
items := Idents.Items()
if len(items) == 0 {
return
}

if config.ReaderClients.IsNil(config.C.ClusterName) {
return
}

now := time.Now().Unix()

// clean old idents
for key, at := range items {
if at.(int64) < now-config.C.NoData.Interval {
Idents.Remove(key)
} else {
// use now as timestamp to redis
err := storage.Redis.HSet(context.Background(), redisKey(config.C.ClusterName), key, now).Err()
if err != nil {
logger.Errorf("redis hset idents failed: %v", err)
}
}
}
}

loopPushMetrics

loopPushMetrics 每隔 10秒钟,调用一次 pushMetrics,向Prometheus 中推送 target_up 指标。

pushMetrics

pushMetrics 用于向 Prometheus 中推送 target_up 指标,其流程如下:

  • 首先获取集群名字,判断是否是集群 Leader,若不是直接返回,是 Leader 则进行下一步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    clusterName := config.C.ClusterName
    isLeader, err := naming.IamLeader(clusterName)
    if err != nil {
    logger.Errorf("handle_idents: %v", err)
    return
    }

    if !isLeader {
    logger.Info("handle_idents: i am not leader")
    return
    }
    • 判断是否是集群 Leader 的方法就是调用 ActiveServers 方法,从数据库中获取集群中所有的存活节点

    • 然后根据名字,对存活节点进行序,排序之后的第一个节点就是 Leader

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      func IamLeader(cluster string) (bool, error) {
      servers, err := ActiveServers(cluster)
      if err != nil {
      logger.Errorf("failed to get active servers: %v", err)
      return false, err
      }

      if len(servers) == 0 {
      logger.Errorf("active servers empty")
      return false, err
      }

      sort.Strings(servers)

      return config.C.Heartbeat.Endpoint == servers[0], nil
      }
  • 接着在 Redis 中获取所有的 target 心跳时间戳(loopToRedis 中记录的,不管存活与否都获取下来)。

    1
    2
    3
    4
    5
    6
    // get all the target heartbeat timestamp
    ret, err := storage.Redis.HGetAll(context.Background(), redisKey(clusterName)).Result()
    if err != nil {
    logger.Errorf("handle_idents: redis hgetall fail: %v", err)
    return
    }
  • 根据时间戳进行判断,获取所有的存活节点,节点存活的时间间隔在配置文件 NoData 中配置;对于非存活的节点,调用 clearDeadIdent 进行清除(就是在 Redis 中删除对应的一项)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    now := time.Now().Unix()
    dur := config.C.NoData.Interval

    actives := make(map[string]struct{})
    for ident, clockstr := range ret {
    clock, err := strconv.ParseInt(clockstr, 10, 64)
    if err != nil {
    continue
    }

    if now-clock > dur {
    clearDeadIdent(context.Background(), clusterName, ident)
    } else {
    actives[ident] = struct{}{}
    }
    }
  • 对于每一个存活节点,target_up = 1。最后将 target_up = 1 指标 Push 到 Prometheus 中

    • 如果在缓存总找到 Target,就把 Target 的 tags 补充到需要 Push 到 Prometheus 的 label 上。

    • 如果没有找到 Target,就在数据库创建中创建 Target。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    // 有心跳,target_up = 1
    // 如果找到target,就把target的tags补充到series上
    // 如果没有target,就在数据库创建target
    for active := range actives {
    // build metrics
    pt := &prompb.TimeSeries{}
    pt.Samples = append(pt.Samples, prompb.Sample{
    // use ms
    Timestamp: now * 1000,
    Value: 1,
    })

    pt.Labels = append(pt.Labels, &prompb.Label{
    Name: model.MetricNameLabel,
    Value: config.C.NoData.Metric,
    })

    pt.Labels = append(pt.Labels, &prompb.Label{
    Name: "ident",
    Value: active,
    })

    target, has := memsto.TargetCache.Get(active)
    if !has {
    // target not exists
    target = &models.Target{
    Cluster: clusterName,
    Ident: active,
    Tags: "",
    TagsJSON: []string{},
    TagsMap: make(map[string]string),
    UpdateAt: now,
    }

    if err := target.Add(); err != nil {
    logger.Errorf("handle_idents: insert target(%s) fail: %v", active, err)
    }
    } else {
    common.AppendLabels(pt, target)
    }

    writer.Writers.PushSample("target_up", pt)
    }
  • 再把存活的 Target 列表传给缓存,看看除了存活的部分,还有没有其他 Target。有的话返回,这些就是未存活的主机,设置 target_up = 0。最后将 target_up = 0 指标 Push 到 Prometheus 中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 把actives传给TargetCache,看看除了active的部分,还有别的target么?有的话返回,设置target_up = 0
    deads := memsto.TargetCache.GetDeads(actives)
    for ident, dead := range deads {
    if ident == "" {
    continue
    }
    // build metrics
    pt := &prompb.TimeSeries{}
    pt.Samples = append(pt.Samples, prompb.Sample{
    // use ms
    Timestamp: now * 1000,
    Value: 0,
    })

    pt.Labels = append(pt.Labels, &prompb.Label{
    Name: model.MetricNameLabel,
    Value: config.C.NoData.Metric,
    })

    pt.Labels = append(pt.Labels, &prompb.Label{
    Name: "ident",
    Value: ident,
    })

    common.AppendLabels(pt, dead)
    writer.Writers.PushSample("target_up", pt)
    }