Dawn's Blogs

分享技术 记录成长

0%

从零开始的日志收集项目 (3) 收集系统信息

上一版的改进

问题

因为log agent可以部署在多台机器上,每一台机器上需要收集的日志可能不尽相同,所以需要针对不同机器部署不同的配置文件

改进

因为不同的机器上拥有不同的配置文件,所以etcd中每一台机器对应的配置文件的key应该是唯一可标识的,这样可以区分不同的机器。在key中加入IP地址以区别不同的key,故config.ini做出以下更改:

1
2
3
4
# 原来是collect_conf_key = collect_conf
# 改进版:
# 保存日志收集配置文件的key,根据IP地址(%s)得到key
collect_conf_key = collect_%s_conf

相对应的增加一个获取IP地址的函数,用于获取IP地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
// GetOutboundIP 获取本机的IP地址
func GetOutboundIP() (ip string, err error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return
}
defer conn.Close()

localAddr := conn.LocalAddr().(*net.UDPAddr)
// fmt.Println(localAddr.String())
ip = strings.Split(localAddr.IP.String(), ":")[0]
return
}

main主函数中,首先获取本机的IP地址:

1
2
3
4
5
6
// 获取本机ip
ip, err := common.GetOutboundIP()
if err != nil {
logrus.Error("get local ip failed, err:", err)
return
}

根据配置文件中的collect_conf_key = collect_%s_conf项格式化字符串,替换配置文件结构体中的相应字段:

1
2
// 用ip替换collect_conf_key中的%s
configObj.EtcdConfig.CollectConfKey = fmt.Sprintf(configObj.EtcdConfig.CollectConfKey, ip)

收集系统信息

目前,已经完成了日志收集log agent的构建。

现在来构建用于收集系统日志的sys info agent,需要收集的系统信息包括CPU、主存、磁盘、网络信息,将收集到的系统信息发送给Kafka。基本思路与log agent相同:

  • 首先根据配置文件获取Kafka的相关配置信息
  • 根据配置信息初始化Kafka
    • 连接Kafka
    • 初始化消息管道,这个消息管道用于接收收集到的系统信息
    • 开启协程,从消息管道中取出消息,发送到Kafka
  • 初始化系统信息收集系统,开启线程用于采集数据,每隔1秒收集一次

配置文件

因为需要收集不同种类的信息,每一类系统信息下的属性不一样,所以需要区别不同种类的系统信息。为此,不同的系统信息可以对应Kafka的不同主题Topic,故在配置文件中需要指明各类Topic的名称:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 收集系统信息的配置文件

[kafka]
# Kafka地址
address = 127.0.0.1:9092
# 收集cpu信息的主题
cpu_topic = cpu_info
# 收集主存信息的主题
mem_topic = mem_info
# 收集磁盘信息的主题
disk_topic = disk_info
# 收集网络信息的主题
net_topic = net_info
# 消息管道大小
chan_size = 1000

对应于配置文件的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 记录配置信息的结构体
type Config struct {
KafkaConfig
}

type KafkaConfig struct {
Address string `ini:"address"`
CpuTopic string `ini:"cpu_topic"`
MemTopic string `ini:"mem_topic"`
DiskTopic string `ini:"disk_topic"`
NetTopic string `ini:"net_topic"`
ChanSize int `int:"chan_size"`
}

收集系统信息

收集系统信息的流程都是大致相同的:

  • 获取相应的系统信息
  • 将系统信息json序列化
  • 封装为Kafka消息,发送到消息管道中

初始化系统信息收集模块

因为不同类别的系统信息对应不同的Topic,所以在初始化时需要对这类全局变量根据配置文件进行赋值。

同时,需要启动一个线程,每隔1秒钟进行一次系统信息收集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
var (
cpuTopic string
memTopic string
diskTopic string
netTopic string
)

// Init 做系统信息收集的初始化工作
func Init(configObj *conf.Config) {
// 初始化全局变量
cpuTopic = configObj.KafkaConfig.CpuTopic
memTopic = configObj.KafkaConfig.MemTopic
diskTopic = configObj.KafkaConfig.DiskTopic
netTopic = configObj.KafkaConfig.NetTopic
// 独立开启一个协程,用于采集数据
go run()

}

func run() {
// 每隔1s采集一次数据
ticker := time.Tick(time.Second)
for _ = range ticker {
getCPUInfo()
getMemInfo()
getDiskInfo()
getNetInfo()
}
}

获取CPU信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type CpuInfo struct {
Percent float64 `json:"cpu_percent"` // cpu使用率
}

// 获取CPU信息,发送到消息管道中
func getCPUInfo() {
// 获取CPU使用率
percent, err := cpu.Percent(time.Second, false)
if err != nil {
logrus.Error("get cpu use percent failed, err:", err)
return
}
// 构造CpuInfo结构体
cpuInfo := CpuInfo{
Percent: percent[0],
}
// 将结构体JSON序列化
b, err := json.Marshal(cpuInfo)
if err != nil {
logrus.Error("can not parse CpuInfo struct to json, err:", err)
}
// 将CPU信息封装为Kafka消息
msg := &sarama.ProducerMessage{}
msg.Topic = cpuTopic
msg.Value = sarama.ByteEncoder(b)
// 放到通道中
kafka.PushMsg(msg)
}

获取主存信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type MemInfo struct {
// 总容量
Total int64 `json:"total"`
// 可用空间
Available int64 `json:"available"`
// 已用空间
Used int64 `json:"used"`
// 已用空间的比例
UsedPercent float64 `json:"used_percent"`
}

func getMemInfo() {
// 获取内存信息
info, err := mem.VirtualMemory()
if err != nil {
logrus.Error("get memory info failed, err:", err)
return
}
// 封装到MemInfo结构体中
memInfo := MemInfo{
Total: int64(info.Total),
Available: int64(info.Available),
Used: int64(info.Used),
UsedPercent: info.UsedPercent,
}
// 将结构体json序列化
b, err := json.Marshal(memInfo)
if err != nil {
logrus.Error("can not parse MemInfo struct to json, err:", err)
}
// 封装为Kafka消息
msg := &sarama.ProducerMessage{}
msg.Topic = memTopic
msg.Value = sarama.ByteEncoder(b)
// 推送至消息管道中
kafka.PushMsg(msg)
}

获取磁盘信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
type DiskInfo struct {
// 盘符
Path string `json:"path"`
// 总容量
Total string `json:"total"`
// 空闲容量
Free string `json:"free"`
// 已使用容量
Used string `json:"used"`
// 已使用比例
UsedPercent string `json:"used_percent"`
}

func getDiskInfo() {
// 取出各个分区
partitions, err := disk.Partitions(true)
if err != nil {
logrus.Error("get disk partitions failed, err:", err)
}
// 遍历各个分区,分别取出分区信息
for _, partition := range partitions {
// 开启一个协程独立获取分区信息并将信息送入消息管道
go func(partition *disk.PartitionStat) {
// 获取分区的信息
info, err := disk.Usage(partition.Mountpoint)
if err != nil {
logrus.Errorf("get partition %v info failed, err: %v\n", partition.Mountpoint, err)
return
}
// 构建DiskInfo结构体
diskInfo := DiskInfo{
Path: info.Path,
Total: strconv.FormatUint(info.Total, 10),
Used: strconv.FormatUint(info.Used, 10),
Free: strconv.FormatUint(info.Free, 10),
UsedPercent: strconv.FormatFloat(info.UsedPercent, 'f', 5, 64),
}
// 将结构体序列化为json格式
b, err := json.Marshal(diskInfo)
if err != nil {
logrus.Error("can not parse DiskInfo struct to json, err:", err)
return
}
// 封装为Kafka消息
msg := &sarama.ProducerMessage{}
msg.Topic = diskTopic
msg.Value = sarama.ByteEncoder(b)
// 推送到消息管道中
kafka.PushMsg(msg)
}(&partition)
}
}

获取网络信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// 记录即将向Kafka发送的网络信息
type NetInfo struct {
// 网卡名 interface name
Name string `json:"name"`
// 发送速率 字节/s
BytesSentRate float64 `json:"bytes_sent_rate"`
// 接收速率 字节/s
BytesRecvRate float64 `json:"bytes_recv_rate"`
// 包发送速率 包数/s
PacketsSentRate float64 `json:"packets_sent_rate"`
// 包接收速率 包数/s
PacketsRecvRate float64 `json:"packets_recv_rate"`
}

// 网络状态信息
type NetStat struct {
Name string
BytesSent int64
BytesRecv int64
PacketsSent int64
PacketsRecv int64
TimeStamp int64
}

var (
// 对于lastNetStatMap的互斥锁
mu sync.Mutex
// 记录上一次获取到的网络状态,key为网卡名称
lastNetStatMap map[string]*NetStat = map[string]*NetStat{}
)

func getNetInfo() {
// 获取各个网卡信息
infos, err := net.IOCounters(true)
// 当前时间戳
curentTimeStamp := time.Now().Unix()
if err != nil {
logrus.Error("get net info failed, err:", err)
return
}
// 获取各个网卡的信息
for _, info := range infos {
// 为每一个网卡起一个协程,放入到消息管道中
go func(info *net.IOCountersStat) {
mu.Lock()
defer mu.Unlock()
netStat := &NetStat{
Name: info.Name,
BytesSent: int64(info.BytesSent),
BytesRecv: int64(info.BytesRecv),
PacketsSent: int64(info.PacketsSent),
PacketsRecv: int64(info.PacketsRecv),
TimeStamp: curentTimeStamp,
}
if _, ok := lastNetStatMap[info.Name]; !ok {
// 没有上一次的网络状态信息,记录状态,返回
lastNetStatMap[info.Name] = netStat
return
}
// 根据网络状态计算速率
lastNetStat := lastNetStatMap[info.Name]
bytesSentRate := float64(int64(info.BytesSent)-lastNetStat.BytesSent) / float64(curentTimeStamp-lastNetStat.TimeStamp)
bytesRecvRate := float64(int64(info.BytesRecv)-lastNetStat.BytesRecv) / float64(curentTimeStamp-lastNetStat.TimeStamp)
packetsSentRate := float64(int64(info.PacketsSent)-lastNetStat.PacketsSent) / float64(curentTimeStamp-lastNetStat.TimeStamp)
packetsRecvRate := float64(int64(info.PacketsRecv)-lastNetStat.PacketsRecv) / float64(curentTimeStamp-lastNetStat.TimeStamp)
// 构建NetInfo结构体
netInfo := NetInfo{
Name: info.Name,
BytesSentRate: bytesSentRate,
BytesRecvRate: bytesRecvRate,
PacketsSentRate: packetsSentRate,
PacketsRecvRate: packetsRecvRate,
}
// 序列化为json格式
b, err := json.Marshal(netInfo)
if err != nil {
logrus.Error("can not parse DiskInfo struct to json, err:", err)
return
}
// 包装为kafka消息
msg := &sarama.ProducerMessage{}
msg.Topic = netTopic
msg.Value = sarama.ByteEncoder(b)
// 推送到消息管道中
kafka.PushMsg(msg)
// 记录状态
lastNetStatMap[info.Name] = netStat
}(&info)
}
}

整体流程

sys info agent的主函数流程为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func main() {
// 读配置文件
cfg, err := ini.Load("conf/config.ini")
if err != nil {
logrus.Error("Load config failed, err:", err)
return
}

var configObj = new(conf.Config)
err = cfg.MapTo(configObj)
if err != nil {
logrus.Error("Map failed, err:", err)
return
}
fmt.Println(configObj)

// 初始化Kafka
err = kafka.Init([]string{configObj.KafkaConfig.Address}, configObj.KafkaConfig.ChanSize)
if err != nil {
logrus.Error("init kafka failed, err:", err)
return
}
logrus.Info("init kafka success!")

// 初始化信息采集工作
sysInfo.Init(configObj)
logrus.Info("start collect system info...")

// 阻塞主进程,防止主进程退出
select {}
}