1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| package main
import ( "fmt" "github.com/Shopify/sarama" "github.com/go-ini/ini" "github.com/sirupsen/logrus" "logagent/kafka" "logagent/tailfile" )
type Config struct { KafkaConfig `ini:"kafka"` CollectConfig `ini:"collect"` }
type KafkaConfig struct { Address string `ini:"address"` Topic string `ini:"topic"` ChanSize int `int:"chan_size"` } type CollectConfig struct { LogFilePath string `ini:"logfile_path"` }
func run() { for { line, ok := <-tailfile.TailObj.Lines if !ok { logrus.Warnf("tail file close reopen, filename: %s\n", tailfile.TailObj.Filename) } fmt.Println(line.Text) msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder(line.Text) kafka.MsgChan <- msg } }
func main() { cfg, err := ini.Load("conf/config.ini") if err != nil { logrus.Error("Load config failed, err:", err) return }
var configObj = new(Config) err = cfg.MapTo(configObj) if err != nil { logrus.Error("Map failed, err:", err) return } fmt.Println(configObj)
err = kafka.Init([]string{configObj.KafkaConfig.Address}, configObj.KafkaConfig.ChanSize) if err != nil { logrus.Error("init kafka failed, err:", err) return } logrus.Info("init kafka success!")
err = tailfile.Init(configObj.CollectConfig.LogFilePath) if err != nil { logrus.Error("init tail failed, err:", err) return } logrus.Info("init tail success!")
run() }
|