上一版本的问题 首先来看上一版本的配置文件:
1 2 3 4 5 6 7 8 9 10 11 [kafka] address = 127.0 .0.1 :9092 topic = web_logchan_size = 1000 [collect] logfile_path = F:\gopath\src\logagent\logs\1 .log
通过配置文件指定需要收集的日志时,主要有两个问题:
日志文件的存放路径只能有一条,也就是说不能同时启动多个tail.Tail
对象,来收集多个日志
无法实现对配置文件的实时监控,根据配置文件的更改来做出及时的变化
通过etcd管理日志配置 上述问题的原因在于ini配置文件无法同时定义多个日志文件路径。解决的方法就是通过etcd管理日志收集配置,在etcd中以json的格式存储日志收集配置信息,如下方:
1 2 3 4 5 6 7 8 9 10 [ { "path" :"F:/gopath/src/logagent/logs/web.log" , "topic" :"web_log" }, { "path" :"F:/gopath/src/logagent/logs/blog.log" , "topic" :"blog_log" } ]
相应的,需要修改config.ini
配置文件,配置etcd的相关设置:
1 2 3 4 5 6 7 8 9 10 11 12 [kafka] address = 127.0 .0.1 :9092 topic = web_logchan_size = 1000 [etcd] address = 127.0 .0.1 :2379 collect_conf_key = collect_conf
从etcd中读取日志收集配置 初始化etcd连接,etcd\etcd.go
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 var ( client *clientv3.Client ) func Init (endPoints []string ) (err error) { client, err = clientv3.New(clientv3.Config{ Endpoints: endPoints, DialTimeout: time.Second * 5 , }) if err != nil { logrus.Error("connect to etcd failed, err:" , err) } return }
从etcd中获取关于日志收集的配置,etcd\etcd.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func GetConf (key string ) (collectEntryList []common.CollectEntry, err error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() resp, err := client.Get(ctx, key) if err != nil { logrus.Errorf("get conf from etcd by key: %v err: %v" , key, err) return } if len (resp.Kvs) == 0 { logrus.Warn("get len=0 conf from etcd by key:" , key) return } ret := resp.Kvs[0 ] err = json.Unmarshal(ret.Value, &collectEntryList) if err != nil { logrus.Error("json unmarshal failed, err:" , err) } return }
其中common.CollectEntry
的结构为:
1 2 3 4 5 6 7 package commontype CollectEntry struct { Path string `json:"path"` Topic string `json:"topic"` }
根据配置启动多个tailTask 将main.go
中的run()
函数作为tailTask的方法。根据得到的配置,创建多个tailTask,对于每一个tailTask,创建一个协程执行run
方法,开始收集日志,发往消息管道中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 type tailTask struct { path string topic string tailObj *tail.Tail } var ( tailCfg tail.Config ) func Init (confList []common.CollectEntry) (err error) { tailCfg = tail.Config{ Location: &tail.SeekInfo{ Offset: 0 , Whence: os.SEEK_END, }, ReOpen: true , MustExist: false , Follow: true , Poll: true , } for _, conf := range confList { tt, err := newTailTask(conf) if err != nil { logrus.Errorf("create tailObj for path: %s failed, err: %s\n" , conf.Path, err) continue } go tt.run() } return } func (t *tailTask) run () { logrus.Infof("collect for path:%s is running..." , t.path) for { line, ok := <-t.tailObj.Lines: if !ok { logrus.Warnf("tail file close reopen, filename: %s\n" , t.tailObj.Filename) } line.Text = strings.Trim(line.Text, "\r" ) if len (line.Text) == 0 { logrus.Info("a empty line, continue!" ) continue } logrus.Infof("read contents:%s from log file:%s\n" , line.Text, t.tailObj.Filename) msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder(line.Text) kafka.PushMsg(msg) } } func newTailTask (conf common.CollectEntry) (tt tailTask, err error) { ctx, cancel := context.WithCancel(context.Background()) tt = tailTask{ path: conf.Path, topic: conf.Topic, ctx: ctx, cancel: cancel, } tt.tailObj, err = tail.TailFile(conf.Path, tailCfg) return }
监控etcd中的新配置 需要新定义一个结构体,用于管理各个tailTask,tailfile\tailTask_mgr.go
:
1 2 3 4 5 6 type tailTaskMgr struct { tailTaskMap map [string ]*tailTask collectEntryList []common.CollectEntry confChan chan []common.CollectEntry }
程序启动后,需要开启一个协程监听etcd中关于日志收集配置的变化。如果日志收集配置有更改,那么会将新的配置发送到配置管道中,etcd\etcd.go
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func WatchConf (key string ) { wChan := client.Watch(context.Background(), key) for wResp := range wChan { logrus.Info("get new conf from etcd." ) for _, event := range wResp.Events { var newConfList []common.CollectEntry if event.Type == clientv3.EventTypeDelete { logrus.Warning("warning! conf has been deleted!" ) tailfile.PushNewConf(newConfList) continue } err := json.Unmarshal(event.Kv.Value, &newConfList) if err != nil { logrus.Error("json unmarshal new config failed, err:" , err) continue } tailfile.PushNewConf(newConfList) } } }
同时对于tail任务的初始化函数也会初始化tailTaskMgr,并且在开启新的tailTask的同时,也会将tailTask的信息记录到tailTaskMgr中。最后开启一个协程,等待新的配置的到来tailfile\tailTask_mgr.go
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 var ( mgr *tailTaskMgr tailCfg tail.Config ) func Init (confList []common.CollectEntry) (err error) { tailCfg = tail.Config{ Location: &tail.SeekInfo{ Offset: 0 , Whence: os.SEEK_END, }, ReOpen: true , MustExist: false , Follow: true , Poll: true , } mgr = &tailTaskMgr{ tailTaskMap: make (map [string ]*tailTask, 20 ), collectEntryList: confList, confChan: make (chan []common.CollectEntry), } for _, conf := range confList { tt, err := newTailTask(conf) if err != nil { logrus.Errorf("create tailObj for path: %s failed, err: %s\n" , conf.Path, err) continue } mgr.tailTaskMap[tt.path] = &tt go tt.run() } go mgr.watch() return }
其中func (mgr *tailTaskMgr) watch()
方法用于监听新配置的到来:
对于原来已经有的tailTask,不用变化
对于原来没有的,新建一个tailTask
对于原来有,现在新的配置中没有的,要杀死相应的tailTask
对于杀死协程的办法,这里采用context包的WithCancel
方法。同时,需要更改tailTask结构体,向其中添加上下文ctx和取消函数cancel,tailfile\tailTask.go
:
1 2 3 4 5 6 7 8 type tailTask struct { path string topic string tailObj *tail.Tail ctx context.Context cancel context.CancelFunc }
同时,在tailTask日志收集时,需要注意上下文是否执行了cancel函数,若执行了cancel函数,则停止对于该文件监听:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func (t *tailTask) run () { logrus.Infof("collect for path:%s is running..." , t.path) for { select { case <-t.ctx.Done(): logrus.Info("stop tail for file:" , t.tailObj.Filename) t.tailObj.Cleanup() t.tailObj.Stop() return case line, ok := <-t.tailObj.Lines: if !ok { logrus.Warnf("tail file close reopen, filename: %s\n" , t.tailObj.Filename) } line.Text = strings.Trim(line.Text, "\r" ) if len (line.Text) == 0 { logrus.Info("a empty line, continue!" ) continue } logrus.Infof("read contents:%s from log file:%s\n" , line.Text, t.tailObj.Filename) msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder(line.Text) kafka.PushMsg(msg) } } }
最后,tailfile\tailTask_mgr.go
中对于新配置的监听函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 func (mgr *tailTaskMgr) watch () { for { newConf := <-mgr.confChan logrus.Info("get new conf from etcd:" , newConf) for _, conf := range newConf { if _, ok := mgr.tailTaskMap[conf.Path]; ok { continue } tt, err := newTailTask(conf) if err != nil { logrus.Errorf("create tailObj for path: %s failed, err: %s\n" , conf.Path, err) continue } mgr.tailTaskMap[tt.path] = &tt go tt.run() } for path, task := range mgr.tailTaskMap { var found bool for _, conf := range newConf { if path == conf.Path { found = true break } } if !found { delete (mgr.tailTaskMap, path) task.cancel() } } } }
总结 最后进行总结,整体流程如下:
首先读取配置文件,读取etcd和kafka的配置信息
kafka的操作:
初始化连接kafka
初始化一个消息管道用于保存向kafka中发送的消息
起一个协程,监听消息管道,一旦消息管道中有新的消息,就向kafka发送消息
对于etcd的操作:
首先初始化etcd连接
从etcd中根据key拉取日志收集的配置信息
启动一个协程,监控etcd中的变化日志收集配置信息的变化,若配置信息有变化,则将新的配置信息发送到存放配置信息的管道中
收集日志文件:
根据刚开始拉取到的配置文件建立多个tailTask,每个tailTask启动一个协程以实现对多个日志的跟踪
起一个协程,监听配置信息管道,一旦有新的配置,则进行对tailTask进行相应的管理
所以main.go
如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 func main () { cfg, err := ini.Load("conf/config.ini" ) if err != nil { logrus.Error("Load config failed, err:" , err) return } var configObj = new (Config) err = cfg.MapTo(configObj) if err != nil { logrus.Error("Map failed, err:" , err) return } fmt.Println(configObj) err = kafka.Init([]string {configObj.KafkaConfig.Address}, configObj.KafkaConfig.ChanSize) if err != nil { logrus.Error("init kafka failed, err:" , err) return } logrus.Info("init kafka success!" ) err = etcd.Init([]string {configObj.EtcdConfig.Address}) if err != nil { logrus.Error("init etcd failed, err:" , err) } logrus.Info("init etcd success!" ) confList, err := etcd.GetConf(configObj.EtcdConfig.CollectConfKey) if err != nil { logrus.Error("get conf from etcd failed, err:" , err) } logrus.Info("get conf from etcd success!" , confList) go etcd.WatchConf(configObj.EtcdConfig.CollectConfKey) err = tailfile.Init(confList) if err != nil { logrus.Error("init tail failed, err:" , err) return } logrus.Info("init tail success!" ) select {} }