n9e server judge引擎主要用于产生告警信息。
Server judge引擎
启动 judge 引擎时,主要有启动了 4 个 goroutine,代表了 n9e 告警引擎的四个职责:
- loopConsume 用于消费告警事件(包括告警事件和已经恢复的告警事件),填写历史告警信息,发送提醒。
- loopFilterRules 首先从所有的告警规则中,通过集群+对规则 ID 应用一致性哈希的方式过滤出需要本节点监控的告警规则。然后对每一个自己负责的规则启动 worker,作为生产者生成告警事件。
- reportQueueSize 向时序数据库推送数据,报告告警队列长度。
- sender.StartEmailSender 定期消费邮件提醒,用于在一个长连接内,批量发送邮件,减少与邮件服务器建立连接的次数。
1 | func Start(ctx context.Context) error { |
新版本中,Start
函数有了一些微调,主要的变化在于:
- 在开始 judge 引擎时,首先调用
reloadTpls
函数加载告警模板文件。 - 启动了一个 Reporter 协程,用于批量向管理员发送 n9e 的错误信息
notifyToMaintainer(title, msg)
。
1 | func Start(ctx context.Context) error { |
reporter 结构体、初始化函数如下。其中 reporter.em 用于批量的保存错误信息,key 为错误类型,value 为出现错误的次数。
1
2
3
4
5
6
7
8
9
10
11
12 type reporter struct {
sync.Mutex
em map[ErrorType]uint64
cb func(em map[ErrorType]uint64)
}
var rp reporter
func initReporter(cb func(em map[ErrorType]uint64)) {
rp = reporter{cb: cb, em: make(map[ErrorType]uint64)}
rp.Start()
}在
rp.Start()
函数中,每过一分钟检查一次是否需要向管理员报告错误信息,若需要则调用r.cb()
进行报告。 告警分为两种:
- 内置 builtin 告警:如 dingding、飞书、企业微信、邮件。
- 插件 plugin 告警:插件需要实现特定的接口(包括向管理员通知、普通的通知方法),通过 plugin 包加载 so 镜像后,lookup 插件内部的变量,通过类型断言转为接口类型,进而实现插件的告警调用(go plugin 包不支持 Windows)。
1
2
3
4
5
6
7
8
9
10
11 func (r *reporter) Start() {
for {
select {
case <-time.After(time.Minute):
cur := r.reset()
if cur != nil {
r.cb(cur)
}
}
}
}
loopConsume
loopConsume主要用于定期消费 EventQueue 中的事件(通过信号量机制,底层是一个 channel,用来限制并发消费告警事件的数量),包括消费告警事件和已经恢复的告警事件。
主要逻辑在 consumeOne
函数中:
- 首先调用
event.ParseRuleNote
函数来解析 RuleNote 字段。
字段的解析使用 go-template 实现。
在二次开发中,新增了对 rule name 字段的解析。
persist
函数用于持久化。将当前告警加入到历史告警记录中,然后将当前事件加入到 AlertCurEvent 表(当前告警)中(根据告警信息的 hash 来判断是否已经在表中,如果在当前告警表中已经存在则先删除,如果已经恢复则不插入到当前告警表中)。被恢复的告警事件到此为止结束消费,下面是告警事件的后需消费流程。
fileUsers
函数用于在 event 中填写需要通知告警的 user (和需要被电话通知的 user,这部分是 imon 中的功能)。- 事件中记录了需要被提醒的用户组 ID,在内存缓冲中的用户组信息。
- 用户组信息中记录了用户 ID 和 oncall ID,根据 oncall ID 在缓存中查询 oncall 信息,判断用户是否应该被 oncall。
- 根据 用户 ID 在缓存中查询用户信息,填写需要被通知的用户和需要被 oncall 的用户。
callback
函数用于发送url回调(post方式)。notify
函数就是用于通知告警信息的。
1 | func consumeOne(event *models.AlertCurEvent) { |
notify 发送告警信息
notify
中首先调用 RedisPub 和 Webhook 方式进行告警,然后调用 handleNotice(notice, stdinBytes)
运行其他的方式。
1 | alertingRedisPub(stdinBytes) |
handleNotice
函数中,首先执行 CallScript 和 CallPlugin 的方式,接着执行内置的发送逻辑:
1 | alertingCallScript(bs) |
loopFilterRules
启动/停止相应的 Worker(一个 Worker 对应一个告警规则,启动一个 goroutine),用于过滤规则将告警信息推入消费队列。
- filterRules 用于根据规则 ID 在 n9e server 集群组成的一致性哈希环上选择某个 server,如果是当前的 server 则 build worker 用于监控是否发生了报警信息,如果发现则将告警事件推送至事件队列进行消费。
1 | func loopFilterRules(ctx context.Context) { |
这里以 filterRules
为例进行说明。
filterRules
函数中,最重要的就是 Build Workers:
1 | func filterRules() { |
workers.Build
在 workers.Build
函数中的流程为:
首先根据 ID,在 memsto 缓存中读取全部的 AlertRule:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16rules := make(map[string]*models.AlertRule)
for i := 0; i < len(rids); i++ {
rule := memsto.AlertRuleCache.Get(rids[i])
if rule == nil {
continue
}
hash := str.MD5(fmt.Sprintf("%d_%d_%s",
rule.Id,
rule.PromEvalInterval,
rule.PromQl,
))
rules[hash] = rule
}然后结束不需要的 worker:
1
2
3
4
5
6
7// stop old
for hash := range Workers.rules {
if _, has := rules[hash]; !has {
Workers.rules[hash].Stop()
delete(Workers.rules, hash)
}
}最后使用
go re.Start()
开启所需要的新的 worker:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// start new
for hash := range rules {
if _, has := Workers.rules[hash]; has {
// already exists
continue
}
elst, err := models.AlertCurEventGetByRule(rules[hash].Id)
// ...
// 开启新的 worker
re := RuleEval{
rule: rules[hash],
quit: make(chan struct{}),
fires: firemap,
pendings: make(map[string]*models.AlertCurEvent),
}
go re.Start()
Workers.rules[hash] = re
}
judge
Worker 开启后,就会定时的从 Prometheus 中查询告警规则中定义的信息(如 promQL)。
在扩展时,这里可以添加多个不同的告警监控数据源。
查询出数据后,利用 judge 判断是否需要告警,是否需要恢复。
MakeNewEvent
函数用于判断是否需要告警。recoverRule
函数用于告警事件恢复。
1 | func (r *RuleEval) Judge(clusterName string, vectors []conv.Vector) { |
MakeNewEvent
首先从 memsto 中查询 AlertRule:
1 | curRule := memsto.AlertRuleCache.Get(r.rule.Id) |
接着就是创建 AlertCurEvent,并且根据从内存中查询到的数据填写 AlertCurEvent:
1 | event := &models.AlertCurEvent{ |
最后调用 r.handleNewEvent(event)
去决定这个 AlertCurEvent 是否要推入队列中被消费(告警),根据告警间隔、最大告警次数信息来判断。
recoverRule
在调用 r.MakeNewEvent 后,会返回所有正在告警的 key:
1 | alertingKeys, ruleExists := r.MakeNewEvent("inner", now, clusterName, vectors) |
根据正在告警的 key 来处理是否有恢复的告警,如果有就将恢复的告警信息推入消费队列中。
1 | func (r *RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) { |
reportQueueSize
在 Prometheus 中记录 event 队列的长度。
1 | func reportQueueSize() { |
sender.StartEmailSender
启动 SMTP 协议发送告警信息的发送器。
为了减少与邮件服务器建立连接的次数,如果需要发送邮件,则会与邮件服务器建立 30 秒的长连接,在这段长连接中可以持续的批量发送邮件。每 30 秒或者超出批量发送的数量时,刷新一次连接,若没有邮件需要发送则不与邮件服务器建立连接。
1 | for { |
告警配置
webapi.conf
NotifyChannels 是个数组,可以写多个,告警规则配置页面,展示的通知媒介,就是读取的这个配置文件的内容。
ContactKeys 也是个数组,用于控制用户的联系方式的配置,在个人中心编辑用户信息的时候,除了手机号、邮箱,还可以为用户配置多种联系方式,多种联系方式也是可以自定义的,就是通过上面的配置来控制。
1 | [[NotifyChannels]] |
server.conf
- TemplatesDir 指定模板文件的目录,这个目录下有多个模板文件,遵从 Go Template 语法,可以控制告警发送的消息的格式。
- NotifyConcurrency 表示并发度,可以维持默认,处理不过来了,有事件堆积了再调大。
- NotifyBuiltinChannels 是配置 Go 代码内置的通知媒介,默认 4 个通知媒介都让 Go 代码来做,如果某些通知媒介想做一些自定义,可以从这个数组中删除对应的通知媒介,Go 代码就不处理那个通知媒介了。
1 | [Alerting] |
CallScript
如果内置的发送逻辑搞不定了,比如想支持短信、电话等通知方式,就可以启用 CallScript,夜莺发现这里的 Enable=true
且指定了一个脚本,就会去执行这个脚本,把告警事件的内容发给这个脚本,由这个脚本做后续处理。
系统会把告警事件的内容 encode 成 json,然后通过 stdin 的方式传给 notify.py。notify.py 从 stdin 拿到内容, json.load
之后就得到了告警的内容,然后就可以进行后续的处理了。
1 | [Alerting.CallScript] |
CallPlugin
CallPlugin 是动态链接库的方式加载外部逻辑,
1 | [Alerting.CallPlugin] |
实现以上接口的 struct
实例即为合法 plugin
,编译后生成一个notify.so
链接文件,放到 n9e 对应项目位置即可。
RedisPub
这个配置如果开启,n9e-server 会把生成的告警事件 publish 给 redis。
1 | [Alerting.RedisPub] |
Webhook
如果下面的配置启用,n9e-server 生成告警事件之后,就会回调这个 Url。告警事件的内容会 encode 成 json,放到 HTTP request body 中,POST 给这个 Url,也可以自定义 Header,即 Headers 配置,Headers 是个数组,必须是偶数个,Key1, Value1, Key2, Value2 这个写法。
1 | [Alerting.Webhook] |