上一版的改进
问题
因为log agent可以部署在多台机器上,每一台机器上需要收集的日志可能不尽相同,所以需要针对不同机器部署不同的配置文件。
改进
因为不同的机器上拥有不同的配置文件,所以etcd中每一台机器对应的配置文件的key应该是唯一可标识的,这样可以区分不同的机器。在key中加入IP地址以区别不同的key,故config.ini
做出以下更改:
1 2 3 4
|
collect_conf_key = collect_%s_conf
|
相对应的增加一个获取IP地址的函数,用于获取IP地址:
1 2 3 4 5 6 7 8 9 10 11 12 13
| func GetOutboundIP() (ip string, err error) { conn, err := net.Dial("udp", "8.8.8.8:80") if err != nil { return } defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr) ip = strings.Split(localAddr.IP.String(), ":")[0] return }
|
在main
主函数中,首先获取本机的IP地址:
1 2 3 4 5 6
| ip, err := common.GetOutboundIP() if err != nil { logrus.Error("get local ip failed, err:", err) return }
|
根据配置文件中的collect_conf_key = collect_%s_conf
项格式化字符串,替换配置文件结构体中的相应字段:
1 2
| configObj.EtcdConfig.CollectConfKey = fmt.Sprintf(configObj.EtcdConfig.CollectConfKey, ip)
|
收集系统信息
目前,已经完成了日志收集log agent的构建。
现在来构建用于收集系统日志的sys info agent,需要收集的系统信息包括CPU、主存、磁盘、网络信息,将收集到的系统信息发送给Kafka。基本思路与log agent相同:
- 首先根据配置文件获取Kafka的相关配置信息
- 根据配置信息初始化Kafka
- 连接Kafka
- 初始化消息管道,这个消息管道用于接收收集到的系统信息
- 开启协程,从消息管道中取出消息,发送到Kafka
- 初始化系统信息收集系统,开启线程用于采集数据,每隔1秒收集一次
配置文件
因为需要收集不同种类的信息,每一类系统信息下的属性不一样,所以需要区别不同种类的系统信息。为此,不同的系统信息可以对应Kafka的不同主题Topic,故在配置文件中需要指明各类Topic的名称:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
[kafka]
address = 127.0.0.1:9092
cpu_topic = cpu_info
mem_topic = mem_info
disk_topic = disk_info
net_topic = net_info
chan_size = 1000
|
对应于配置文件的结构体:
1 2 3 4 5 6 7 8 9 10 11 12 13
| type Config struct { KafkaConfig }
type KafkaConfig struct { Address string `ini:"address"` CpuTopic string `ini:"cpu_topic"` MemTopic string `ini:"mem_topic"` DiskTopic string `ini:"disk_topic"` NetTopic string `ini:"net_topic"` ChanSize int `int:"chan_size"` }
|
收集系统信息
收集系统信息的流程都是大致相同的:
- 获取相应的系统信息
- 将系统信息json序列化
- 封装为Kafka消息,发送到消息管道中
初始化系统信息收集模块
因为不同类别的系统信息对应不同的Topic,所以在初始化时需要对这类全局变量根据配置文件进行赋值。
同时,需要启动一个线程,每隔1秒钟进行一次系统信息收集。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| var ( cpuTopic string memTopic string diskTopic string netTopic string )
func Init(configObj *conf.Config) { cpuTopic = configObj.KafkaConfig.CpuTopic memTopic = configObj.KafkaConfig.MemTopic diskTopic = configObj.KafkaConfig.DiskTopic netTopic = configObj.KafkaConfig.NetTopic go run()
}
func run() { ticker := time.Tick(time.Second) for _ = range ticker { getCPUInfo() getMemInfo() getDiskInfo() getNetInfo() } }
|
获取CPU信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| type CpuInfo struct { Percent float64 `json:"cpu_percent"` }
func getCPUInfo() { percent, err := cpu.Percent(time.Second, false) if err != nil { logrus.Error("get cpu use percent failed, err:", err) return } cpuInfo := CpuInfo{ Percent: percent[0], } b, err := json.Marshal(cpuInfo) if err != nil { logrus.Error("can not parse CpuInfo struct to json, err:", err) } msg := &sarama.ProducerMessage{} msg.Topic = cpuTopic msg.Value = sarama.ByteEncoder(b) kafka.PushMsg(msg) }
|
获取主存信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| type MemInfo struct { Total int64 `json:"total"` Available int64 `json:"available"` Used int64 `json:"used"` UsedPercent float64 `json:"used_percent"` }
func getMemInfo() { info, err := mem.VirtualMemory() if err != nil { logrus.Error("get memory info failed, err:", err) return } memInfo := MemInfo{ Total: int64(info.Total), Available: int64(info.Available), Used: int64(info.Used), UsedPercent: info.UsedPercent, } b, err := json.Marshal(memInfo) if err != nil { logrus.Error("can not parse MemInfo struct to json, err:", err) } msg := &sarama.ProducerMessage{} msg.Topic = memTopic msg.Value = sarama.ByteEncoder(b) kafka.PushMsg(msg) }
|
获取磁盘信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| type DiskInfo struct { Path string `json:"path"` Total string `json:"total"` Free string `json:"free"` Used string `json:"used"` UsedPercent string `json:"used_percent"` }
func getDiskInfo() { partitions, err := disk.Partitions(true) if err != nil { logrus.Error("get disk partitions failed, err:", err) } for _, partition := range partitions { go func(partition *disk.PartitionStat) { info, err := disk.Usage(partition.Mountpoint) if err != nil { logrus.Errorf("get partition %v info failed, err: %v\n", partition.Mountpoint, err) return } diskInfo := DiskInfo{ Path: info.Path, Total: strconv.FormatUint(info.Total, 10), Used: strconv.FormatUint(info.Used, 10), Free: strconv.FormatUint(info.Free, 10), UsedPercent: strconv.FormatFloat(info.UsedPercent, 'f', 5, 64), } b, err := json.Marshal(diskInfo) if err != nil { logrus.Error("can not parse DiskInfo struct to json, err:", err) return } msg := &sarama.ProducerMessage{} msg.Topic = diskTopic msg.Value = sarama.ByteEncoder(b) kafka.PushMsg(msg) }(&partition) } }
|
获取网络信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| type NetInfo struct { Name string `json:"name"` BytesSentRate float64 `json:"bytes_sent_rate"` BytesRecvRate float64 `json:"bytes_recv_rate"` PacketsSentRate float64 `json:"packets_sent_rate"` PacketsRecvRate float64 `json:"packets_recv_rate"` }
type NetStat struct { Name string BytesSent int64 BytesRecv int64 PacketsSent int64 PacketsRecv int64 TimeStamp int64 }
var ( mu sync.Mutex lastNetStatMap map[string]*NetStat = map[string]*NetStat{} )
func getNetInfo() { infos, err := net.IOCounters(true) curentTimeStamp := time.Now().Unix() if err != nil { logrus.Error("get net info failed, err:", err) return } for _, info := range infos { go func(info *net.IOCountersStat) { mu.Lock() defer mu.Unlock() netStat := &NetStat{ Name: info.Name, BytesSent: int64(info.BytesSent), BytesRecv: int64(info.BytesRecv), PacketsSent: int64(info.PacketsSent), PacketsRecv: int64(info.PacketsRecv), TimeStamp: curentTimeStamp, } if _, ok := lastNetStatMap[info.Name]; !ok { lastNetStatMap[info.Name] = netStat return } lastNetStat := lastNetStatMap[info.Name] bytesSentRate := float64(int64(info.BytesSent)-lastNetStat.BytesSent) / float64(curentTimeStamp-lastNetStat.TimeStamp) bytesRecvRate := float64(int64(info.BytesRecv)-lastNetStat.BytesRecv) / float64(curentTimeStamp-lastNetStat.TimeStamp) packetsSentRate := float64(int64(info.PacketsSent)-lastNetStat.PacketsSent) / float64(curentTimeStamp-lastNetStat.TimeStamp) packetsRecvRate := float64(int64(info.PacketsRecv)-lastNetStat.PacketsRecv) / float64(curentTimeStamp-lastNetStat.TimeStamp) netInfo := NetInfo{ Name: info.Name, BytesSentRate: bytesSentRate, BytesRecvRate: bytesRecvRate, PacketsSentRate: packetsSentRate, PacketsRecvRate: packetsRecvRate, } b, err := json.Marshal(netInfo) if err != nil { logrus.Error("can not parse DiskInfo struct to json, err:", err) return } msg := &sarama.ProducerMessage{} msg.Topic = netTopic msg.Value = sarama.ByteEncoder(b) kafka.PushMsg(msg) lastNetStatMap[info.Name] = netStat }(&info) } }
|
整体流程
sys info agent的主函数流程为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func main() { cfg, err := ini.Load("conf/config.ini") if err != nil { logrus.Error("Load config failed, err:", err) return }
var configObj = new(conf.Config) err = cfg.MapTo(configObj) if err != nil { logrus.Error("Map failed, err:", err) return } fmt.Println(configObj)
err = kafka.Init([]string{configObj.KafkaConfig.Address}, configObj.KafkaConfig.ChanSize) if err != nil { logrus.Error("init kafka failed, err:", err) return } logrus.Info("init kafka success!")
sysInfo.Init(configObj) logrus.Info("start collect system info...")
select {} }
|