Writer
n9e server writer 用于在 Prometheus 中写入数据,主要的作用是写入 target_up 指标。
Writers
1 | type WritersType struct { |
Writers 是全局变量,用于保存所有的 writer。其类型为 WritersType,其他模块可以调用 PushSample 方法向队列中写入数据,Writers 会定期的将队列中的数据写入到 Prometheus 中。
1 | func (ws *WritersType) PushSample(ident string, v interface{}, clusters ...string) |
WritersType 有三个字段:
globalOpt 用于记录所有 writer 共有的配置信息,如队列数量、队列长度等。
1
2
3
4
5
6type WriterGlobalOpt struct {
QueueCount int
QueueMaxSize int
QueuePopSize int
ShardingKey string
}backends 用于保存集群名字与实际与 Prometheus 通信的写入 API 的映射(WriterType)。
1
2
3
4type WriterType struct {
Opts config.WriterOptions
Client api.Client
}queue 是一个带有最大长度限制的队列。
Init
Init 函数用于初始化 Writers。
首先配置信息,为每一个集群都开启 globalOpt.QueueCount 个队列,并且开启协程 Writers.StartConsumer 开始消费,定期将队列中的数据发送给 Prometheus。
1
2
3
4
5
6
7
8
9
10
11Writers.globalOpt = globalOpt
Writers.queues = make(map[string]map[int]*SafeListLimited)
for _, opt := range opts {
if _, ok := Writers.queues[opt.ClusterName]; !ok {
Writers.queues[opt.ClusterName] = make(map[int]*SafeListLimited)
for i := 0; i < globalOpt.QueueCount; i++ {
Writers.queues[opt.ClusterName][i] = NewSafeListLimited(Writers.globalOpt.QueueMaxSize)
go Writers.StartConsumer(i, Writers.queues[opt.ClusterName][i], opt.ClusterName)
}
}
}其次,开启一个协程,用于向普罗米修斯报告每一个队列的当前长度。
1
go reportChanSize()
最后,根据每一个 wirter 的配置,开启与 Prometheus 连接的客户端(用于写入数据),并且调用 Writers.Put 函数将开启的客户端记录下来(保存在 WritersType.backends 字段)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31for i := 0; i < len(opts); i++ {
cli, err := api.NewClient(api.Config{
Address: opts[i].Url,
RoundTripper: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opts[i].DialTimeout) * time.Millisecond,
KeepAlive: time.Duration(opts[i].KeepAlive) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opts[i].Timeout) * time.Millisecond,
TLSHandshakeTimeout: time.Duration(opts[i].TLSHandshakeTimeout) * time.Millisecond,
ExpectContinueTimeout: time.Duration(opts[i].ExpectContinueTimeout) * time.Millisecond,
MaxConnsPerHost: opts[i].MaxConnsPerHost,
MaxIdleConns: opts[i].MaxIdleConns,
MaxIdleConnsPerHost: opts[i].MaxIdleConnsPerHost,
IdleConnTimeout: time.Duration(opts[i].IdleConnTimeout) * time.Millisecond,
},
})
if err != nil {
return err
}
writer := WriterType{
Opts: opts[i],
Client: cli,
}
Writers.Put(opts[i].Url, writer)
}
Writers.StartConsumer
StartConsumer 每 400 毫秒从队列中取出一串数据,开启一个协程将这些数据写入到 Prometheus 中。
1 | func (ws *WritersType) StartConsumer(index int, ch *SafeListLimited, clusterName string) { |
reportChanSize
reportChanSize 的逻辑很简单,就是每 3 秒钟报告一次队列的当前大小。
1 | func reportChanSize() { |
PushSample
在向 Prometheus 推送指标时,会对 ident 名称做 CRC32 编码后取模,得到指标被推送的队列。