Dawn's Blogs

分享技术 记录成长

0%

夜莺监控系统学习笔记 (5) Writer

Writer

n9e server writer 用于在 Prometheus 中写入数据,主要的作用是写入 target_up 指标

Writers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type WritersType struct {
globalOpt config.WriterGlobalOpt
backends map[string]WriterType
queues map[string]map[int]*SafeListLimited
}


func NewWriters() WritersType {
return WritersType{
backends: make(map[string]WriterType),
}
}

var Writers = NewWriters()

Writers 是全局变量,用于保存所有的 writer。其类型为 WritersType,其他模块可以调用 PushSample 方法向队列中写入数据,Writers 会定期的将队列中的数据写入到 Prometheus 中

1
func (ws *WritersType) PushSample(ident string, v interface{}, clusters ...string)

WritersType 有三个字段:

  • globalOpt 用于记录所有 writer 共有的配置信息,如队列数量、队列长度等。

    1
    2
    3
    4
    5
    6
    type WriterGlobalOpt struct {
    QueueCount int
    QueueMaxSize int
    QueuePopSize int
    ShardingKey string
    }
  • backends 用于保存集群名字与实际与 Prometheus 通信的写入 API 的映射(WriterType)。

    1
    2
    3
    4
    type WriterType struct {
    Opts config.WriterOptions
    Client api.Client
    }
  • queue 是一个带有最大长度限制的队列。

Init

Init 函数用于初始化 Writers。

  • 首先配置信息,为每一个集群都开启 globalOpt.QueueCount 个队列,并且开启协程 Writers.StartConsumer 开始消费,定期将队列中的数据发送给 Prometheus。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Writers.globalOpt = globalOpt
    Writers.queues = make(map[string]map[int]*SafeListLimited)
    for _, opt := range opts {
    if _, ok := Writers.queues[opt.ClusterName]; !ok {
    Writers.queues[opt.ClusterName] = make(map[int]*SafeListLimited)
    for i := 0; i < globalOpt.QueueCount; i++ {
    Writers.queues[opt.ClusterName][i] = NewSafeListLimited(Writers.globalOpt.QueueMaxSize)
    go Writers.StartConsumer(i, Writers.queues[opt.ClusterName][i], opt.ClusterName)
    }
    }
    }
  • 其次,开启一个协程,用于向普罗米修斯报告每一个队列的当前长度

    1
    go reportChanSize()
  • 最后,根据每一个 wirter 的配置,开启与 Prometheus 连接的客户端(用于写入数据),并且调用 Writers.Put 函数将开启的客户端记录下来(保存在 WritersType.backends 字段)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    for i := 0; i < len(opts); i++ {
    cli, err := api.NewClient(api.Config{
    Address: opts[i].Url,
    RoundTripper: &http.Transport{
    // TLSClientConfig: tlsConfig,
    Proxy: http.ProxyFromEnvironment,
    DialContext: (&net.Dialer{
    Timeout: time.Duration(opts[i].DialTimeout) * time.Millisecond,
    KeepAlive: time.Duration(opts[i].KeepAlive) * time.Millisecond,
    }).DialContext,
    ResponseHeaderTimeout: time.Duration(opts[i].Timeout) * time.Millisecond,
    TLSHandshakeTimeout: time.Duration(opts[i].TLSHandshakeTimeout) * time.Millisecond,
    ExpectContinueTimeout: time.Duration(opts[i].ExpectContinueTimeout) * time.Millisecond,
    MaxConnsPerHost: opts[i].MaxConnsPerHost,
    MaxIdleConns: opts[i].MaxIdleConns,
    MaxIdleConnsPerHost: opts[i].MaxIdleConnsPerHost,
    IdleConnTimeout: time.Duration(opts[i].IdleConnTimeout) * time.Millisecond,
    },
    })

    if err != nil {
    return err
    }

    writer := WriterType{
    Opts: opts[i],
    Client: cli,
    }

    Writers.Put(opts[i].Url, writer)
    }

Writers.StartConsumer

StartConsumer 每 400 毫秒从队列中取出一串数据,开启一个协程将这些数据写入到 Prometheus 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (ws *WritersType) StartConsumer(index int, ch *SafeListLimited, clusterName string) {
for {
series := ch.PopBack(ws.globalOpt.QueuePopSize)
if len(series) == 0 {
time.Sleep(time.Millisecond * 400)
continue
}

for key := range ws.backends {
if ws.backends[key].Opts.ClusterName != clusterName {
continue
}
go ws.backends[key].Write(clusterName, index, series)
}
}
}

reportChanSize

reportChanSize 的逻辑很简单,就是每 3 秒钟报告一次队列的当前大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func reportChanSize() {
clusterName := config.C.ClusterName
if clusterName == "" {
return
}

for {
time.Sleep(time.Second * 3)
for cluster, m := range Writers.queues {
for i, c := range m {
size := c.Len()
promstat.GaugeSampleQueueSize.WithLabelValues(cluster, fmt.Sprint(i)).Set(float64(size))
}
}
}
}

PushSample

在向 Prometheus 推送指标时,会对 ident 名称做 CRC32 编码后取模,得到指标被推送的队列。