Dawn's Blogs

分享技术 记录成长

0%

夜莺监控系统学习笔记 (3) server judge引擎

n9e server judge引擎主要用于产生告警信息

Server judge引擎

启动 judge 引擎时,主要有启动了 4 个 goroutine,代表了 n9e 告警引擎的四个职责:

  • loopConsume 用于消费告警事件(包括告警事件和已经恢复的告警事件),填写历史告警信息,发送提醒。
  • loopFilterRules 首先从所有的告警规则中,通过集群+对规则 ID 应用一致性哈希的方式过滤出需要本节点监控的告警规则。然后对每一个自己负责的规则启动 worker,作为生产者生成告警事件
  • reportQueueSize 向时序数据库推送数据,报告告警队列长度。
  • sender.StartEmailSender 定期消费邮件提醒,用于在一个长连接内,批量发送邮件,减少与邮件服务器建立连接的次数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func Start(ctx context.Context) error {
err := initTpls()
if err != nil {
return err
}

// start loop consumer
go loopConsume(ctx)

// filter my rules and start worker
go loopFilterRules(ctx)

go reportQueueSize()

go sender.StartEmailSender()

return nil
}

新版本中,Start 函数有了一些微调,主要的变化在于:

  • 在开始 judge 引擎时,首先调用 reloadTpls 函数加载告警模板文件。
  • 启动了一个 Reporter 协程,用于批量管理员发送 n9e 的错误信息 notifyToMaintainer(title, msg)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func Start(ctx context.Context) error {
err := reloadTpls()
if err != nil {
return err
}

// start loop consumer
go loopConsume(ctx)

// filter my rules and start worker
go loopFilterRules(ctx)

go reportQueueSize()

go sender.StartEmailSender()

go initReporter(func(em map[ErrorType]uint64) {
if len(em) == 0 {
return
}
title := fmt.Sprintf("server %s has some errors, please check server logs for detail", config.C.Heartbeat.IP)
msg := ""
for k, v := range em {
msg += fmt.Sprintf("error: %s, count: %d\n", k, v)
}
notifyToMaintainer(title, msg)
})

return nil
}

reporter 结构体、初始化函数如下。其中 reporter.em 用于批量的保存错误信息,key 为错误类型,value 为出现错误的次数。

1
2
3
4
5
6
7
8
9
10
11
12
type reporter struct {
sync.Mutex
em map[ErrorType]uint64
cb func(em map[ErrorType]uint64)
}

var rp reporter

func initReporter(cb func(em map[ErrorType]uint64)) {
rp = reporter{cb: cb, em: make(map[ErrorType]uint64)}
rp.Start()
}

rp.Start() 函数中,每过一分钟检查一次是否需要向管理员报告错误信息,若需要则调用 r.cb() 进行报告。 告警分为两种:

  • 内置 builtin 告警:如 dingding、飞书、企业微信、邮件。
  • 插件 plugin 告警:插件需要实现特定的接口(包括向管理员通知、普通的通知方法),通过 plugin 包加载 so 镜像后,lookup 插件内部的变量,通过类型断言转为接口类型,进而实现插件的告警调用(go plugin 包不支持 Windows)。
1
2
3
4
5
6
7
8
9
10
11
func (r *reporter) Start() {
for {
select {
case <-time.After(time.Minute):
cur := r.reset()
if cur != nil {
r.cb(cur)
}
}
}
}

loopConsume

loopConsume主要用于定期消费 EventQueue 中的事件(通过信号量机制,底层是一个 channel,用来限制并发消费告警事件的数量),包括消费告警事件和已经恢复的告警事件。

主要逻辑在 consumeOne 函数中:

  • 首先调用 event.ParseRuleNote 函数来解析 RuleNote 字段。

字段的解析使用 go-template 实现。

在二次开发中,新增了对 rule name 字段的解析。

  • persist 函数用于持久化。将当前告警加入到历史告警记录中,然后将当前事件加入到 AlertCurEvent 表(当前告警)中(根据告警信息的 hash 来判断是否已经在表中,如果在当前告警表中已经存在则先删除,如果已经恢复则不插入到当前告警表中)。

  • 被恢复的告警事件到此为止结束消费,下面是告警事件的后需消费流程。

  • fileUsers 函数用于在 event 中填写需要通知告警的 user (和需要被电话通知的 user,这部分是 imon 中的功能)。

    • 事件中记录了需要被提醒的用户组 ID,在内存缓冲中的用户组信息。
    • 用户组信息中记录了用户 ID 和 oncall ID,根据 oncall ID 在缓存中查询 oncall 信息,判断用户是否应该被 oncall。
    • 根据 用户 ID 在缓存中查询用户信息,填写需要被通知的用户和需要被 oncall 的用户。
  • callback 函数用于发送url回调(post方式)。

  • notify 函数就是用于通知告警信息的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func consumeOne(event *models.AlertCurEvent) {
LogEvent(event, "consume")

// 解析规则
if err := event.ParseRuleNote(); err != nil {
event.RuleNote = fmt.Sprintf("failed to parse rule note: %v", err)
}

// 持久化,插入到历史告警表,可能插入当前告警表
persist(event)

if event.IsRecovered && event.NotifyRecovered == 0 {
return
}

fillUsers(event)
callback(event)
notify(event)
}

notify 发送告警信息

notify 中首先调用 RedisPub 和 Webhook 方式进行告警,然后调用 handleNotice(notice, stdinBytes) 运行其他的方式。

1
2
3
4
5
6
alertingRedisPub(stdinBytes)
alertingWebhook(event)

handleNotice(notice, stdinBytes)

...

handleNotice函数中,首先执行 CallScript 和 CallPlugin 的方式,接着执行内置的发送逻辑:

1
2
3
4
5
alertingCallScript(bs)

alertingCallPlugin(bs)

...

loopFilterRules

启动/停止相应的 Worker(一个 Worker 对应一个告警规则,启动一个 goroutine),用于过滤规则将告警信息推入消费队列。

  • filterRules 用于根据规则 ID 在 n9e server 集群组成的一致性哈希环上选择某个 server,如果是当前的 server 则 build worker 用于监控是否发生了报警信息,如果发现则将告警事件推送至事件队列进行消费。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func loopFilterRules(ctx context.Context) {
// wait for samples
time.Sleep(time.Duration(config.C.EngineDelay) * time.Second)

duration := time.Duration(9000) * time.Millisecond
for {
select {
case <-ctx.Done():
return
case <-time.After(duration):
filterRules()
filterRecordingRules()
}
}
}

这里以 filterRules 为例进行说明。

filterRules 函数中,最重要的就是 Build Workers:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func filterRules() {
// 获取所有的规则id
ids := memsto.AlertRuleCache.GetRuleIds()
logger.Debugf("AlertRuleCache.GetRuleIds success,ids.len: %d", len(ids))

count := len(ids)
mines := make([]int64, 0, count)

for i := 0; i < count; i++ {
// 在一致性哈希环上选择server节点
node, err := naming.HashRing.GetNode(fmt.Sprint(ids[i]))
if err != nil {
logger.Warning("failed to get node from hashring:", err)
continue
}

// 加入当前节点的规则
if node == config.C.Heartbeat.Endpoint {
mines = append(mines, ids[i])
}
}
// build workers用于监控告警信息
Workers.Build(mines)
}

workers.Build

workers.Build 函数中的流程为:

  • 首先根据 ID,在 memsto 缓存中读取全部的 AlertRule

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    rules := make(map[string]*models.AlertRule)

    for i := 0; i < len(rids); i++ {
    rule := memsto.AlertRuleCache.Get(rids[i])
    if rule == nil {
    continue
    }

    hash := str.MD5(fmt.Sprintf("%d_%d_%s",
    rule.Id,
    rule.PromEvalInterval,
    rule.PromQl,
    ))

    rules[hash] = rule
    }
  • 然后结束不需要的 worker

    1
    2
    3
    4
    5
    6
    7
    // stop old
    for hash := range Workers.rules {
    if _, has := rules[hash]; !has {
    Workers.rules[hash].Stop()
    delete(Workers.rules, hash)
    }
    }
  • 最后使用 go re.Start() 开启所需要的新的 worker

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // start new
    for hash := range rules {
    if _, has := Workers.rules[hash]; has {
    // already exists
    continue
    }

    elst, err := models.AlertCurEventGetByRule(rules[hash].Id)

    // ...

    // 开启新的 worker
    re := RuleEval{
    rule: rules[hash],
    quit: make(chan struct{}),
    fires: firemap,
    pendings: make(map[string]*models.AlertCurEvent),
    }

    go re.Start()
    Workers.rules[hash] = re
    }

judge

Worker 开启后,就会定时的从 Prometheus 中查询告警规则中定义的信息(如 promQL)。

在扩展时,这里可以添加多个不同的告警监控数据源。

查询出数据后,利用 judge 判断是否需要告警,是否需要恢复。

  • MakeNewEvent 函数用于判断是否需要告警。
  • recoverRule 函数用于告警事件恢复。
1
2
3
4
5
6
7
8
9
10
11
func (r *RuleEval) Judge(clusterName string, vectors []conv.Vector) {
now := time.Now().Unix()

alertingKeys, ruleExists := r.MakeNewEvent("inner", now, clusterName, vectors)
if !ruleExists {
return
}

// handle recovered events
r.recoverRule(alertingKeys, now)
}

MakeNewEvent

首先从 memsto 中查询 AlertRule:

1
2
3
4
5
6
curRule := memsto.AlertRuleCache.Get(r.rule.Id)
if curRule == nil {
return
}

r.rule = curRule

接着就是创建 AlertCurEvent,并且根据从内存中查询到的数据填写 AlertCurEvent:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
event := &models.AlertCurEvent{
TriggerTime: vectors[i].Timestamp,
TagsMap: tagsMap,
GroupId: r.rule.GroupId,
RuleName: r.rule.Name,
}

// ...
event.Cluster = r.rule.Cluster
event.Hash = hash
event.RuleId = r.rule.Id
// ...
event.IsRecovered = false
event.LastEvalTime = now

最后调用 r.handleNewEvent(event) 去决定这个 AlertCurEvent 是否要推入队列中被消费(告警),根据告警间隔、最大告警次数信息来判断。

recoverRule

在调用 r.MakeNewEvent 后,会返回所有正在告警的 key:

1
2
3
4
5
6
7
alertingKeys, ruleExists := r.MakeNewEvent("inner", now, clusterName, vectors)
if !ruleExists {
return
}

// handle recovered events
r.recoverRule(alertingKeys, now)

根据正在告警的 key 来处理是否有恢复的告警,如果有就将恢复的告警信息推入消费队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (r *RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) {
for _, hash := range r.pendings.Keys() {
if _, has := alertingKeys[hash]; has {
continue
}
r.pendings.Delete(hash)
}

for hash, event := range r.fires.GetAll() {
if _, has := alertingKeys[hash]; has {
continue
}

r.recoverEvent(hash, event, now)
}
}

reportQueueSize

在 Prometheus 中记录 event 队列的长度。

1
2
3
4
5
6
7
func reportQueueSize() {
for {
time.Sleep(time.Second)

promstat.GaugeAlertQueueSize.Set(float64(EventQueue.Len()))
}
}

sender.StartEmailSender

启动 SMTP 协议发送告警信息的发送器。

为了减少与邮件服务器建立连接的次数,如果需要发送邮件,则会与邮件服务器建立 30 秒的长连接,在这段长连接中可以持续的批量发送邮件每 30 秒或者超出批量发送的数量时,刷新一次连接,若没有邮件需要发送则不与邮件服务器建立连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
for {
select {
case m, ok := <-mailch:
if !ok {
return
}

if !open {
s = dialSmtp(d)
open = true
}

if err := gomail.Send(s, m); err != nil {
logger.Errorf("email_sender: failed to send: %s", err)

// close and retry
if err := s.Close(); err != nil {
logger.Warningf("email_sender: failed to close smtp connection: %s", err)
}

s = dialSmtp(d)
open = true

if err := gomail.Send(s, m); err != nil {
logger.Errorf("email_sender: failed to retry send: %s", err)
}
} else {
logger.Infof("email_sender: result=succ subject=%v to=%v", m.GetHeader("Subject"), m.GetHeader("To"))
}

size++

if size >= conf.Batch {
if err := s.Close(); err != nil {
logger.Warningf("email_sender: failed to close smtp connection: %s", err)
}
open = false
size = 0
}

// Close the connection to the SMTP server if no email was sent in
// the last 30 seconds.
case <-time.After(30 * time.Second):
if open {
if err := s.Close(); err != nil {
logger.Warningf("email_sender: failed to close smtp connection: %s", err)
}
open = false
}
}
}

告警配置

webapi.conf

NotifyChannels 是个数组,可以写多个,告警规则配置页面,展示的通知媒介,就是读取的这个配置文件的内容。

ContactKeys 也是个数组,用于控制用户的联系方式的配置,在个人中心编辑用户信息的时候,除了手机号、邮箱,还可以为用户配置多种联系方式,多种联系方式也是可以自定义的,就是通过上面的配置来控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
[[NotifyChannels]]
Label = "邮箱"
# do not change Key
Key = "email"

[[NotifyChannels]]
Label = "电话"
# do not change Key
Key = "phone"

[[NotifyChannels]]
Label = "企微机器人"
# do not change Key
Key = "wecom"

[[NotifyChannels]]
Label = "飞书机器人"
# do not change Key
Key = "feishu"

[[ContactKeys]]
Label = "Wecom Robot Token"
# do not change Key
Key = "wecom_robot_token"

[[ContactKeys]]
Label = "Dingtalk Robot Token"
# do not change Key
Key = "dingtalk_robot_token"

[[ContactKeys]]
Label = "Feishu Robot Token"
# do not change Key
Key = "feishu_robot_token"

server.conf

  • TemplatesDir 指定模板文件的目录,这个目录下有多个模板文件,遵从 Go Template 语法,可以控制告警发送的消息的格式。
  • NotifyConcurrency 表示并发度,可以维持默认,处理不过来了,有事件堆积了再调大。
  • NotifyBuiltinChannels 是配置 Go 代码内置的通知媒介,默认 4 个通知媒介都让 Go 代码来做,如果某些通知媒介想做一些自定义,可以从这个数组中删除对应的通知媒介,Go 代码就不处理那个通知媒介了。
1
2
3
4
5
[Alerting]
TemplatesDir = "./etc/template"
NotifyConcurrency = 10
# use builtin go code notify
NotifyBuiltinChannels = ["email", "phone", "wecom", "feishu"]

CallScript

如果内置的发送逻辑搞不定了,比如想支持短信、电话等通知方式,就可以启用 CallScript,夜莺发现这里的 Enable=true 且指定了一个脚本,就会去执行这个脚本,把告警事件的内容发给这个脚本,由这个脚本做后续处理。

系统会把告警事件的内容 encode 成 json,然后通过 stdin 的方式传给 notify.py。notify.py 从 stdin 拿到内容, json.load 之后就得到了告警的内容,然后就可以进行后续的处理了。

1
2
3
4
5
[Alerting.CallScript]
# built in sending capability in go code
# so, no need enable script sender
Enable = false
ScriptPath = "./etc/script/notify.py"

CallPlugin

CallPlugin 是动态链接库的方式加载外部逻辑,

1
2
3
4
5
6
7
8
9
10
11
12
[Alerting.CallPlugin]
Enable = false
# use a plugin via `go build -buildmode=plugin -o notify.so`
PluginPath = "./etc/script/notify.so"
Caller = "n9eCaller"
package main
type inter interface {
Descript() string
Notify([]byte)
}
// 0、Descript 可用于该插件在 server 中的描述
// 1、在 Notify 方法中实现要处理的自定义逻辑

实现以上接口的 struct 实例即为合法 plugin,编译后生成一个notify.so 链接文件,放到 n9e 对应项目位置即可。

RedisPub

这个配置如果开启,n9e-server 会把生成的告警事件 publish 给 redis。

1
2
3
4
[Alerting.RedisPub]
Enable = false
# complete redis key: ${ChannelPrefix} + ${Cluster}
ChannelPrefix = "/alerts/"

Webhook

如果下面的配置启用,n9e-server 生成告警事件之后,就会回调这个 Url。告警事件的内容会 encode 成 json,放到 HTTP request body 中,POST 给这个 Url,也可以自定义 Header,即 Headers 配置,Headers 是个数组,必须是偶数个,Key1, Value1, Key2, Value2 这个写法。

1
2
3
4
5
6
7
[Alerting.Webhook]
Enable = false
Url = "http://a.com/n9e/callback"
BasicAuthUser = ""
BasicAuthPass = ""
Timeout = "5s"
Headers = ["Content-Type", "application/json", "X-From", "N9E"]