Dawn's Blogs

分享技术 记录成长

0%

从零开始的日志收集项目 (2) 通过etcd管理日志配置

上一版本的问题

首先来看上一版本的配置文件:

1
2
3
4
5
6
7
8
9
10
11
[kafka]
# Kafka地址
address = 127.0.0.1:9092
# 主题
topic = web_log
# 消息管道大小
chan_size = 1000

[collect]
# 日志文件存放路径
logfile_path = F:\gopath\src\logagent\logs\1.log

通过配置文件指定需要收集的日志时,主要有两个问题:

  • 日志文件的存放路径只能有一条,也就是说不能同时启动多个tail.Tail对象,来收集多个日志
  • 无法实现对配置文件的实时监控,根据配置文件的更改来做出及时的变化

通过etcd管理日志配置

上述问题的原因在于ini配置文件无法同时定义多个日志文件路径。解决的方法就是通过etcd管理日志收集配置,在etcd中以json的格式存储日志收集配置信息,如下方:

1
2
3
4
5
6
7
8
9
10
[
{
"path":"F:/gopath/src/logagent/logs/web.log",
"topic":"web_log"
},
{
"path":"F:/gopath/src/logagent/logs/blog.log",
"topic":"blog_log"
}
]

相应的,需要修改config.ini配置文件,配置etcd的相关设置:

1
2
3
4
5
6
7
8
9
10
11
12
[kafka]
# Kafka地址
address = 127.0.0.1:9092
# 主题
topic = web_log
# 消息管道大小
chan_size = 1000

[etcd]
address = 127.0.0.1:2379
# 保存日志收集配置文件的key
collect_conf_key = collect_conf

从etcd中读取日志收集配置

初始化etcd连接,etcd\etcd.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var (
client *clientv3.Client
)

// Init 初始化etcd连接
func Init(endPoints []string) (err error) {
client, err = clientv3.New(clientv3.Config{
Endpoints: endPoints,
DialTimeout: time.Second * 5,
})
if err != nil {
logrus.Error("connect to etcd failed, err:", err)
}
return
}

从etcd中获取关于日志收集的配置,etcd\etcd.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// GetConf 获取关于收集日志的配置文件
func GetConf(key string) (collectEntryList []common.CollectEntry, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

resp, err := client.Get(ctx, key)
if err != nil {
logrus.Errorf("get conf from etcd by key: %v err: %v", key, err)
return
}
if len(resp.Kvs) == 0 {
logrus.Warn("get len=0 conf from etcd by key:", key)
return
}

// 解析json格式的字符串
ret := resp.Kvs[0]
err = json.Unmarshal(ret.Value, &collectEntryList)
if err != nil {
logrus.Error("json unmarshal failed, err:", err)
}
return
}

其中common.CollectEntry的结构为:

1
2
3
4
5
6
7
package common

// CollectEntry 要收集日志的配置条目
type CollectEntry struct {
Path string `json:"path"` // 日志文件路径
Topic string `json:"topic"` // 主题
}

根据配置启动多个tailTask

main.go中的run()函数作为tailTask的方法。根据得到的配置,创建多个tailTask,对于每一个tailTask,创建一个协程执行run方法,开始收集日志,发往消息管道中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
type tailTask struct {
path string
topic string
tailObj *tail.Tail
}

var (
tailCfg tail.Config
)

func Init(confList []common.CollectEntry) (err error) {
// confList中保存了多个日志收集项
// 针对每一个日志创建一个tailTask

// 初始化Tail的配置
tailCfg = tail.Config{
Location: &tail.SeekInfo{ // 从文件的哪个地方开始读取
Offset: 0,
Whence: os.SEEK_END,
},
ReOpen: true, // 重新打开文件
MustExist: false, // 文件不存在不报错
Follow: true, // 进行跟随
Poll: true,
}

for _, conf := range confList {
tt, err := newTailTask(conf)
if err != nil {
logrus.Errorf("create tailObj for path: %s failed, err: %s\n", conf.Path, err)
continue
}
// 开始收集日志,发往消息管道中
go tt.run()
}

return
}

// 开始收集日志,发往消息管道中
func (t *tailTask) run() {
logrus.Infof("collect for path:%s is running...", t.path)
for {
line, ok := <-t.tailObj.Lines:
// 循环读数据
if !ok {
logrus.Warnf("tail file close reopen, filename: %s\n", t.tailObj.Filename)
}
line.Text = strings.Trim(line.Text, "\r")
if len(line.Text) == 0 {
// 如果是空行,直接略过
logrus.Info("a empty line, continue!")
continue
}
// 把从tail中读到的日志,包装成kafka的msg类型
logrus.Infof("read contents:%s from log file:%s\n", line.Text, t.tailObj.Filename)
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder(line.Text)
// 放到通道中
kafka.PushMsg(msg)
}
}

// 根据配置文件生成并初始化tailTask
func newTailTask(conf common.CollectEntry) (tt tailTask, err error) {
ctx, cancel := context.WithCancel(context.Background())
tt = tailTask{
path: conf.Path,
topic: conf.Topic,
ctx: ctx,
cancel: cancel,
}
tt.tailObj, err = tail.TailFile(conf.Path, tailCfg)
return
}

监控etcd中的新配置

需要新定义一个结构体,用于管理各个tailTask,tailfile\tailTask_mgr.go

1
2
3
4
5
6
// tailTask 的管理者
type tailTaskMgr struct {
tailTaskMap map[string]*tailTask // 以tailTask.Path为键保存tailTask
collectEntryList []common.CollectEntry // 所有的配置项
confChan chan []common.CollectEntry // 等待配置的通道
}

程序启动后,需要开启一个协程监听etcd中关于日志收集配置的变化。如果日志收集配置有更改,那么会将新的配置发送到配置管道中,etcd\etcd.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// WatchConf 监控etcd中日志收集配置变化的函数
func WatchConf(key string) {
wChan := client.Watch(context.Background(), key)

for wResp := range wChan {
logrus.Info("get new conf from etcd.")
for _, event := range wResp.Events {
var newConfList []common.CollectEntry
if event.Type == clientv3.EventTypeDelete {
// 如果是删除
logrus.Warning("warning! conf has been deleted!")
tailfile.PushNewConf(newConfList)
continue
}
err := json.Unmarshal(event.Kv.Value, &newConfList)
if err != nil {
logrus.Error("json unmarshal new config failed, err:", err)
continue
}
// 将新的配置送入管道
tailfile.PushNewConf(newConfList)
}
}
}

同时对于tail任务的初始化函数也会初始化tailTaskMgr,并且在开启新的tailTask的同时,也会将tailTask的信息记录到tailTaskMgr中。最后开启一个协程,等待新的配置的到来tailfile\tailTask_mgr.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
var (
mgr *tailTaskMgr
tailCfg tail.Config
)

func Init(confList []common.CollectEntry) (err error) {
// confList中保存了多个日志收集项
// 针对每一个日志创建一个tailTask

// 初始化Tail的配置
tailCfg = tail.Config{
Location: &tail.SeekInfo{ // 从文件的哪个地方开始读取
Offset: 0,
Whence: os.SEEK_END,
},
ReOpen: true, // 重新打开文件
MustExist: false, // 文件不存在不报错
Follow: true, // 进行跟随
Poll: true,
}
// 初始化tailTaskMgr
mgr = &tailTaskMgr{
tailTaskMap: make(map[string]*tailTask, 20),
collectEntryList: confList,
confChan: make(chan []common.CollectEntry),
}

for _, conf := range confList {
tt, err := newTailTask(conf)
if err != nil {
logrus.Errorf("create tailObj for path: %s failed, err: %s\n", conf.Path, err)
continue
}
// 将tailTask记录在tailTaskMgr中
mgr.tailTaskMap[tt.path] = &tt
// 开始收集日志,发往消息管道中
go tt.run()
}

// 等待新配置到来
go mgr.watch()

return
}

其中func (mgr *tailTaskMgr) watch()方法用于监听新配置的到来:

  • 对于原来已经有的tailTask,不用变化
  • 对于原来没有的,新建一个tailTask
  • 对于原来有,现在新的配置中没有的,要杀死相应的tailTask

对于杀死协程的办法,这里采用context包的WithCancel方法。同时,需要更改tailTask结构体,向其中添加上下文ctx和取消函数cancel,tailfile\tailTask.go

1
2
3
4
5
6
7
8
type tailTask struct {
path string
topic string
tailObj *tail.Tail

ctx context.Context
cancel context.CancelFunc
}

同时,在tailTask日志收集时,需要注意上下文是否执行了cancel函数,若执行了cancel函数,则停止对于该文件监听:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 开始收集日志,发往消息管道中
func (t *tailTask) run() {
logrus.Infof("collect for path:%s is running...", t.path)
for {
select {
case <-t.ctx.Done():
// 执行了cancel函数,结束tailTask结束运行
// tailObj停止监听文件
logrus.Info("stop tail for file:", t.tailObj.Filename)
t.tailObj.Cleanup()
t.tailObj.Stop()
// 协程返回
return

case line, ok := <-t.tailObj.Lines:
// 循环读数据
if !ok {
logrus.Warnf("tail file close reopen, filename: %s\n", t.tailObj.Filename)
}
line.Text = strings.Trim(line.Text, "\r")
if len(line.Text) == 0 {
// 如果是空行,直接略过
logrus.Info("a empty line, continue!")
continue
}
// 把从tail中读到的日志,包装成kafka的msg类型
logrus.Infof("read contents:%s from log file:%s\n", line.Text, t.tailObj.Filename)
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder(line.Text)
// 放到通道中
kafka.PushMsg(msg)
}

}
}

最后,tailfile\tailTask_mgr.go中对于新配置的监听函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (mgr *tailTaskMgr) watch() {
for {
// 等待新配置到来
newConf := <-mgr.confChan
logrus.Info("get new conf from etcd:", newConf)
//新配置到来后管理之前启动的tailTask
for _, conf := range newConf {
// 原来已经有的tailTask,不用变化
if _, ok := mgr.tailTaskMap[conf.Path]; ok {
continue
}

// 原来没有的新建一个tailTask
tt, err := newTailTask(conf)
if err != nil {
logrus.Errorf("create tailObj for path: %s failed, err: %s\n", conf.Path, err)
continue
}
// 将tailTask记录在tailTaskMgr中
mgr.tailTaskMap[tt.path] = &tt
// 开始收集日志,发往消息管道中
go tt.run()
}
// 原来有,现在没有的要杀死tailTask
// 找出tailTaskMap中存在,但是newConf中不存在的tailTask,把他们都停掉
for path, task := range mgr.tailTaskMap {
var found bool
for _, conf := range newConf {
if path == conf.Path {
// 在newConf中找到
found = true
break
}
}
if !found {
// 若在newConf中没有找到,杀死tailTask
delete(mgr.tailTaskMap, path)
task.cancel()
}
}
}
}

总结

最后进行总结,整体流程如下:

  • 首先读取配置文件,读取etcd和kafka的配置信息
  • kafka的操作:
    • 初始化连接kafka
    • 初始化一个消息管道用于保存向kafka中发送的消息
    • 起一个协程,监听消息管道,一旦消息管道中有新的消息,就向kafka发送消息
  • 对于etcd的操作:
    • 首先初始化etcd连接
    • 从etcd中根据key拉取日志收集的配置信息
    • 启动一个协程,监控etcd中的变化日志收集配置信息的变化,若配置信息有变化,则将新的配置信息发送到存放配置信息的管道中
  • 收集日志文件:
    • 根据刚开始拉取到的配置文件建立多个tailTask,每个tailTask启动一个协程以实现对多个日志的跟踪
    • 起一个协程,监听配置信息管道,一旦有新的配置,则进行对tailTask进行相应的管理

所以main.go如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func main() {
// 读配置文件
cfg, err := ini.Load("conf/config.ini")
if err != nil {
logrus.Error("Load config failed, err:", err)
return
}

var configObj = new(Config)
err = cfg.MapTo(configObj)
if err != nil {
logrus.Error("Map failed, err:", err)
return
}
fmt.Println(configObj)

// 初始化Kafka
err = kafka.Init([]string{configObj.KafkaConfig.Address}, configObj.KafkaConfig.ChanSize)
if err != nil {
logrus.Error("init kafka failed, err:", err)
return
}
logrus.Info("init kafka success!")

// 初始化etcd
err = etcd.Init([]string{configObj.EtcdConfig.Address})
if err != nil {
logrus.Error("init etcd failed, err:", err)
}
logrus.Info("init etcd success!")

// 从etcd中拉取要收集的日志
confList, err := etcd.GetConf(configObj.EtcdConfig.CollectConfKey)
if err != nil {
logrus.Error("get conf from etcd failed, err:", err)
}
logrus.Info("get conf from etcd success!", confList)

// 监控etcd中的configObj.EtcdConfig.CollectConfKey对应值得变化
go etcd.WatchConf(configObj.EtcdConfig.CollectConfKey)

// 根据配置文件中的日志路径使用tail收集日志,初始化tail
err = tailfile.Init(confList)
if err != nil {
logrus.Error("init tail failed, err:", err)
return
}
logrus.Info("init tail success!")

select {} // 阻塞,防止主进程退出
}