Dawn's Blogs

分享技术 记录成长

0%

从零开始的日志收集项目 (1) 解析配置文件进行日志收集

项目介绍

解析配置文件,进行日志收集

首先,初步编写一个logagent

  1. 从配置文件中读取信息。
  2. 根据从配置文件中得到的Kafka地址,初始化Kafka生产者:
    • 生产者配置
    • 连接Kafka
    • 初始化消息管道,并且起一个协程,用于从消息管道中读取数据并向Kafka推送消息
  3. 根据从配置文件中得到的日志路径,初始化一个tail.Tail对象TailObj
  4. TailObj从日志中读取数据,封装成msg送入消息管道。协程从消息管道中取出数据并发送给Kafka

读取配置文件

配置文件conf/config.ini

1
2
3
4
5
6
7
8
9
10
11
[kafka]
# Kafka地址
address = 127.0.0.1:9092
# 主题
topic = web_log
# 消息管道大小
chan_size = 1000

[collect]
# 日志文件存放路径
logfile_path = F:\gopath\src\logagent\logs\1.log

读取配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 读配置文件
cfg, err := ini.Load("conf/config.ini")
if err != nil {
logrus.Error("Load config failed, err:", err)
return
}

var configObj = new(Config)
err = cfg.MapTo(configObj)
if err != nil {
logrus.Error("Map failed, err:", err)
return
}

初始化Kafka生产者

kafka/kafka.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package kafka

import (
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
)

// kafka相关操作

var (
Client sarama.SyncProducer
MsgChan chan *sarama.ProducerMessage
)

func Init(address []string, chanSize int) (err error) {
// 生产者配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 分区
config.Producer.Return.Successes = true // 确认

// 连接kafka
Client, err = sarama.NewSyncProducer(address, config)
if err != nil {
logrus.Error("kafka: producer closed, err:", err)
}
// 初始化消息管道
MsgChan = make(chan *sarama.ProducerMessage, chanSize)
// 起一个协程向kafka发送数据
go sendMsg()
return
}

// 从MsgChan中读取msg,发送给kafka
func sendMsg() {
for {
select {
case msg := <-MsgChan:
pid, offser, err := Client.SendMessage(msg)
if err != nil {
logrus.Warn("send msg failed, err:", err)
return
}
logrus.Infof("send msg to kafka success. pid=%v offset=%v", pid, offser)
}
}
}

初始化tail.Tail对象

tailfile.tailfile.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package tailfile

import (
"github.com/hpcloud/tail"
"github.com/sirupsen/logrus"
"os"
)

var (
TailObj *tail.Tail
)

func Init(fileName string) (err error) {
TailObj, err = tail.TailFile(fileName, tail.Config{
Location: &tail.SeekInfo{ // 从文件的哪个地方开始读取
Offset: 0,
Whence: os.SEEK_END,
},
ReOpen: true, // 重新打开文件
MustExist: false, // 文件不存在不报错
Follow: true, // 进行跟随
Poll: true,
})
if err != nil {
logrus.Errorf("tailfile: create tailObj for path %s failed, err: %v\n", fileName, err)
}
return
}

TailObj从日志中读取数据,送入消息管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 从TailObj中从log中读取数据,封装成msg放入到通道中
func run() {
for {
// 循环读数据
line, ok := <-tailfile.TailObj.Lines
if !ok {
logrus.Warnf("tail file close reopen, filename: %s\n", tailfile.TailObj.Filename)
}
// 把从tail中读到的日志,包装成kafka的msg类型
fmt.Println(line.Text)
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder(line.Text)
// 放到通道中
kafka.MsgChan <- msg
}
}

总结:

所以main.go中的流程为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main

import (
"fmt"
"github.com/Shopify/sarama"
"github.com/go-ini/ini"
"github.com/sirupsen/logrus"
"logagent/kafka"
"logagent/tailfile"
)

// 日志收集的客户端
// 收集指定目录下的日志文件,发送到kafka中

type Config struct {
KafkaConfig `ini:"kafka"`
CollectConfig `ini:"collect"`
}

type KafkaConfig struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
ChanSize int `int:"chan_size"`
}
type CollectConfig struct {
LogFilePath string `ini:"logfile_path"`
}

// 真正业务逻辑
// 从TailObj中从log中读取数据,封装成msg放入到通道中
func run() {
for {
// 循环读数据
line, ok := <-tailfile.TailObj.Lines
if !ok {
logrus.Warnf("tail file close reopen, filename: %s\n", tailfile.TailObj.Filename)
}
// 把从tail中读到的日志,包装成kafka的msg类型
fmt.Println(line.Text)
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder(line.Text)
// 放到通道中
kafka.MsgChan <- msg
}
}

func main() {
// 读配置文件
cfg, err := ini.Load("conf/config.ini")
if err != nil {
logrus.Error("Load config failed, err:", err)
return
}

var configObj = new(Config)
err = cfg.MapTo(configObj)
if err != nil {
logrus.Error("Map failed, err:", err)
return
}
fmt.Println(configObj)

// 初始化Kafka
err = kafka.Init([]string{configObj.KafkaConfig.Address}, configObj.KafkaConfig.ChanSize)
if err != nil {
logrus.Error("init kafka failed, err:", err)
return
}
logrus.Info("init kafka success!")

// 根据配置文件中的日志路径使用tail收集日志,初始化tail
err = tailfile.Init(configObj.CollectConfig.LogFilePath)
if err != nil {
logrus.Error("init tail failed, err:", err)
return
}
logrus.Info("init tail success!")

// 把日志发往kafka
// 从TailObj中从log中读取数据,封装成msg放入到通道中
run()
}