Dawn's Blogs

分享技术 记录成长

0%

基本操作

连接zookeeper

1
2
3
4
5
6
// 连接zookeeper服务器
func initZk() (*zk.Conn, error) {
servers := []string{"127.0.0.1:2181"}
conn, _, err := zk.Connect(servers, time.Second*5)
return conn, err
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
conn zookeeper连接
path 节点
data 节点数据
flags有4种取值:
0:永久,除非手动删除
zk.FlagEphemeral = 1:短暂,session断开则改节点也被删除
zk.FlagSequence = 2:会自动在节点后面添加序号
3:Ephemeral和Sequence,即,短暂且自动添加序号
*/
func add(conn *zk.Conn, path string, data []byte, flags int32) error {
acls := zk.WorldACL(zk.PermAll)
s, err := conn.Create(path, data, flags, acls)
if err != nil {
log.Println("create failed, err:", err)
} else {
log.Println("create success:", s)
}
return err
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 删除
func delete(conn *zk.Conn, path string) error {
// 获取版本
_, stat, err := conn.Get(path)
if err != nil {
log.Println("get version failed, err:", err)
return err
}
// 删除
err = conn.Delete(path, stat.Version)
if err != nil {
log.Printf("delete failed for path %s, err: %v\n", path, err)
}
return err
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 更新节点
func set(conn *zk.Conn, path string, data []byte) error {
// 获取版本
_, stat, err := conn.Get(path)
if err != nil {
log.Println("get version failed, err:", err)
return err
}
// 更新
stat, err = conn.Set(path, data, stat.Version)
if err != nil {
log.Printf("set failed for path %s, err: %v\n", path, err)
} else {
log.Printf("set success ,stat: %v\n", stat)
}
return err
}

1
2
3
4
5
6
7
8
9
10
// 查询
func get(conn *zk.Conn, path string) ([]byte, error) {
b, stat, err := conn.Get(path)
if err != nil {
log.Println("get failed, err:", err)
} else {
log.Printf("info of node for %v: %v\n", path, stat)
}
return b, err
}

Watch机制

只监听一次

调用zk.WithEventCallback(callback)设置回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main

import (
"fmt"
"github.com/samuel/go-zookeeper/zk"
"log"
"time"
)

// zk watch 回调函数
func callback(event zk.Event) {
// zk.EventNodeCreated
// zk.EventNodeDeleted
fmt.Println("###########################")
fmt.Println("path: ", event.Path)
fmt.Println("type: ", event.Type.String())
fmt.Println("state: ", event.State.String())
fmt.Println("---------------------------")
}

func initZk() (*zk.Conn, error) {
eventCallbackOption := zk.WithEventCallback(callback)
servers := []string{"127.0.0.1:2181"}
conn, _, err := zk.Connect(servers, time.Second*5, eventCallbackOption)
return conn, err
}

func add(conn *zk.Conn, path string, data []byte, flags int32) error {
acls := zk.WorldACL(zk.PermAll)
_, err := conn.Create(path, data, flags, acls)

return err
}

// 删除
func delete(conn *zk.Conn, path string) error {
// 获取版本
_, stat, err := conn.Get(path)
if err != nil {
return err
}
// 删除
err = conn.Delete(path, stat.Version)
return err
}

func listenOne(conn *zk.Conn, path string) error {
//调用conn.ExistsW(path) 或GetW(path)为对应节点设置监听,该监听只生效一次
_, _, _, err := conn.ExistsW(path)
return err
}

func main() {
conn, err := initZk()
defer conn.Close()
if err != nil {
fmt.Println(err)
return
}

path := "/dawn111"
data := []byte("hello world")

// 开始监听path,只监听一次
err = listenOne(conn, path)
if err != nil {
log.Println(err)
}
// 触发创建数据操作
err = add(conn, path, data, 0)
if err != nil {
log.Println("create failed, err:", err)
} else {
log.Println("创建数据成功!")
}

//再次监听path
err = listenOne(conn, path)
if err != nil {
log.Println(err)
}
// 触发删除数据操作
err = delete(conn, path)
if err != nil {
log.Printf("delete failed for path %s, err: %v\n", path, err)
} else {
log.Println("删除数据成功!")
}

}

开启一个channel处理

可以开一一个协程来处理chanel中传来的event事件,可以持续监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main

import (
"fmt"
"github.com/samuel/go-zookeeper/zk"
"log"
"time"
)

func initZk() (*zk.Conn, error) {
servers := []string{"127.0.0.1:2181"}
conn, _, err := zk.Connect(servers, time.Second*5)
return conn, err
}

func add(conn *zk.Conn, path string, data []byte, flags int32) error {
acls := zk.WorldACL(zk.PermAll)
_, err := conn.Create(path, data, flags, acls)

return err
}

// 删除
func delete(conn *zk.Conn, path string) error {
// 获取版本
_, stat, err := conn.Get(path)
if err != nil {
return err
}
// 删除
err = conn.Delete(path, stat.Version)
return err
}

func listenOneChannel(conn *zk.Conn, path string) error {
_, _, ch, err := conn.ExistsW(path)

go func() {
event := <-ch
fmt.Println("*******************")
fmt.Println("path:", event.Path)
fmt.Println("type:", event.Type.String())
fmt.Println("state:", event.State.String())
fmt.Println("-------------------")
}()

return err
}

func main() {
conn, err := initZk()
defer conn.Close()
if err != nil {
fmt.Println(err)
return
}

path := "/dawn111"
data := []byte("hello world")

// 开启一个管道持续监听
err = listenOneChannel(conn, path)
if err != nil {
log.Println(err)
}
// 创建数据触发事件
err = add(conn, path, data, 0)
if err != nil {
log.Println("create failed, err:", err)
} else {
log.Println("创建数据成功!")
}
// 删除数据触发事件
err = delete(conn, path)
if err != nil {
log.Printf("delete failed for path %s, err: %v\n", path, err)
} else {
log.Println("删除数据成功!")
}

}

ZooKeeper实现分布式锁

zookeeper中可以实现两种锁:

  • 读锁(共享锁):可以一起读,但是与写锁互斥。
  • 写锁(互斥锁):与读锁和写锁都互斥。

实现读锁

  • 创建⼀个临时序号节点,节点的数据是read,表示是读锁
  • 获取当前zookeeper中序号比自己小的所有节点。若这些节点有写锁,则加锁不成功。

读锁

实现写锁

  • 创建⼀个临时序号节点,节点的数据是write,表示是写锁
  • 获取zookeeper中所有的子节点
  • 判断自己是否是最小的节点:
    • 如果是,则上锁成功
    • 如果不是,说明前面还有锁,上锁失败,为最小节点设置监听。阻塞等待,watch机制会当最小节点发生变化时通知当前节点,再执行第二步

写锁

羊群效应

羊群效应:如果采用上述的上锁方式,只要有节点发生变化,就会触发其他节点的监听事件,这样对于zookeeper的压力非常大。

可以调整成链式监听:

链式监听

阅读全文 »

ZooKeeper介绍

ZooKeeper是一种分布式协调服务,用于管理大型分布式集群。在分布式环境中协调和管理服务是一个复杂的过程,但是ZooKeeper通过其简单的架构和API解决了这个问题。

应用场景

分布式协调组件

在分布式系统中,需要zookeeper作为分布式协调组件,协调分布式系统中的状态。如下图,用户更改了一台服务器A-2上的状态,但是另一台主机A-1上的flag没有做出及时的更改,这就需要zookeeper进行协调,更改A-1的状态,以协调分布式系统中所有服务器的状态。

zk作为分布式协调组件

分布式锁

zookeeper在实现分布式锁上,可以做到强⼀致性

分布式锁就是将锁放在zookeeper上,当需要上锁时到zookeeper中获取锁。

无状态化的实现

对于分布式系统,将Session或者Cookie等状态信息保存在各个主机上是不现实的。可以利用zookeeper实现分布式系统的无状态化,将登录信息统一保存在zookeeper中,各个分布式主机在zookeeper中获取状态信息。

zk作为无状态化的实现

阅读全文 »

上一版的改进

问题

因为log agent可以部署在多台机器上,每一台机器上需要收集的日志可能不尽相同,所以需要针对不同机器部署不同的配置文件

改进

因为不同的机器上拥有不同的配置文件,所以etcd中每一台机器对应的配置文件的key应该是唯一可标识的,这样可以区分不同的机器。在key中加入IP地址以区别不同的key,故config.ini做出以下更改:

1
2
3
4
# 原来是collect_conf_key = collect_conf
# 改进版:
# 保存日志收集配置文件的key,根据IP地址(%s)得到key
collect_conf_key = collect_%s_conf

相对应的增加一个获取IP地址的函数,用于获取IP地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
// GetOutboundIP 获取本机的IP地址
func GetOutboundIP() (ip string, err error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return
}
defer conn.Close()

localAddr := conn.LocalAddr().(*net.UDPAddr)
// fmt.Println(localAddr.String())
ip = strings.Split(localAddr.IP.String(), ":")[0]
return
}

main主函数中,首先获取本机的IP地址:

1
2
3
4
5
6
// 获取本机ip
ip, err := common.GetOutboundIP()
if err != nil {
logrus.Error("get local ip failed, err:", err)
return
}

根据配置文件中的collect_conf_key = collect_%s_conf项格式化字符串,替换配置文件结构体中的相应字段:

1
2
// 用ip替换collect_conf_key中的%s
configObj.EtcdConfig.CollectConfKey = fmt.Sprintf(configObj.EtcdConfig.CollectConfKey, ip)

收集系统信息

目前,已经完成了日志收集log agent的构建。

现在来构建用于收集系统日志的sys info agent,需要收集的系统信息包括CPU、主存、磁盘、网络信息,将收集到的系统信息发送给Kafka。基本思路与log agent相同:

  • 首先根据配置文件获取Kafka的相关配置信息
  • 根据配置信息初始化Kafka
    • 连接Kafka
    • 初始化消息管道,这个消息管道用于接收收集到的系统信息
    • 开启协程,从消息管道中取出消息,发送到Kafka
  • 初始化系统信息收集系统,开启线程用于采集数据,每隔1秒收集一次
阅读全文 »

influxDB

influxDB是时间序列数据库,主键是时间戳。

概念

influxDB名词 传统数据库概念
database 数据库
measurement 数据表
point 数据行

数据格式

1
measurement [,tag_key1=tag_value1...] field_key=field_value[,field_key2=field_value2] [timestamp]
  • measurement:类似于数据表的概念
  • field-key,field-value:用来存储数据支持各种类型,数据存储时不会进行索引。每条数据必须拥有一个field-key,如果使用field-key作为过滤条件则需要遍历所有的数据
  • tag-key,tag-value:与field类似,不过会进行索引,方便查询时用于过滤条件

measurements, tag keys, field keys,tag values 全局存一份;field values 和 timestamps 每条数据存一份

Point

influxDB中的point相当于传统数据库里的一行数据,由时间戳(time)、数据(field)、标签(tag)组成。

Point属性 传统数据库概念
time 每个数据记录时间,是数据库中的主索引
field 各种记录值(没有索引的属性),例如温度、湿度
tags 各种有索引的属性,例如地区、海拔

Series

measurement、tag set、retention policy相同的数据集合,称为一个series。同一个series的数据,在物理上会按照时间顺序排列存储在一起。

series的key为measurement + 所有tags的序列化字符串

Retention Policy

保留策略RP,包括数据保存的时间以及在集群中的副本个数

默认的保留策略为,保存时间无限制、副本个数为1

Shard

shard与RP相关联,每一个存储策略下会存在许多shard,每一个shard存储一个指定时间段内的数据,并且不会重复。

shard保存数据的始阶段计算函数如下:

1
2
3
4
5
6
7
8
func shardGroupDuration(d time.Duration) time.Duration {
if d >= 180*24*time.Hour || d == 0 { // 6 months or 0
return 7 * 24 * time.Hour
} else if d >= 2*24*time.Hour { // 2 days
return 1 * 24 * time.Hour
}
return 1 * time.Hour
}

每一个shard都对应底层的一个TSM存储引擎,这样做的目的就是为了可以通过时间来快速定位到要查询数据的相关资源,加速查询的过程,并且也让之后的批量删除数据的操作变得非常简单且高效。

阅读全文 »

上一版本的问题

首先来看上一版本的配置文件:

1
2
3
4
5
6
7
8
9
10
11
[kafka]
# Kafka地址
address = 127.0.0.1:9092
# 主题
topic = web_log
# 消息管道大小
chan_size = 1000

[collect]
# 日志文件存放路径
logfile_path = F:\gopath\src\logagent\logs\1.log

通过配置文件指定需要收集的日志时,主要有两个问题:

  • 日志文件的存放路径只能有一条,也就是说不能同时启动多个tail.Tail对象,来收集多个日志
  • 无法实现对配置文件的实时监控,根据配置文件的更改来做出及时的变化

通过etcd管理日志配置

上述问题的原因在于ini配置文件无法同时定义多个日志文件路径。解决的方法就是通过etcd管理日志收集配置,在etcd中以json的格式存储日志收集配置信息,如下方:

1
2
3
4
5
6
7
8
9
10
[
{
"path":"F:/gopath/src/logagent/logs/web.log",
"topic":"web_log"
},
{
"path":"F:/gopath/src/logagent/logs/blog.log",
"topic":"blog_log"
}
]

相应的,需要修改config.ini配置文件,配置etcd的相关设置:

1
2
3
4
5
6
7
8
9
10
11
12
[kafka]
# Kafka地址
address = 127.0.0.1:9092
# 主题
topic = web_log
# 消息管道大小
chan_size = 1000

[etcd]
address = 127.0.0.1:2379
# 保存日志收集配置文件的key
collect_conf_key = collect_conf
阅读全文 »

etcd

etcd是使用Go语言开发的一个开源的、高可用的分布式key-value存储系统,可以用于配置共享服务的注册和发现

etcd具有以下特点

  • 完全复制:集群中的每个节点都可以使用完整的存档
  • 高可用性:Etcd可用于避免硬件的单点故障或网络问题
  • 一致性:每次读取都会返回跨多主机的最新写入
  • 简单:包括一个定义良好、面向用户的API(gRPC)
  • 安全:实现了带有可选的客户端证书身份验证的自动化TLS
  • 快速:每秒10000次写入的基准速度
  • 可靠:使用Raft算法实现了强一致、高可用的服务存储目录

应用场景

服务发现

在一个分布式集群中,如何找到提供某项服务的主机并与之建立连接,这就是服务发现所完成的功能:

  1. 服务提供者首先在注册中心注册服务
  2. 服务请求者根据所需要的服务在注册中心请求服务提供者的地址信息
  3. 服务请求者与服务提供者建立连接,完成服务

服务发现

配置中心

将一些配置信息放到 etcd 上进行集中管理。

这类场景的使用方式通常是这样:应用在启动的时候主动从 etcd 获取一次配置信息,同时,在 etcd 节点上注册一个 Watcher 并等待,以后每次配置有更新的时候,etcd 都会实时通知订阅者,以此达到获取最新配置信息的目的。

分布式锁

因为 etcd 使用 Raft 算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁。锁服务有两种使用方式,一是保持独占,二是控制时序。

  • 保持独占即所有获取锁的用户最终只有一个可以得到。etcd 为此提供了一套实现分布式锁原子操作 CAS(CompareAndSwap)的 API。通过设置prevExist值,可以保证在多个节点同时去创建某个目录时,只有一个成功。而创建成功的用户就可以认为是获得了锁。
  • 控制时序,即所有想要获得锁的用户都会被安排执行,但是获得锁的顺序也是全局唯一的,同时决定了执行顺序。etcd 为此也提供了一套 API(自动创建有序键),对一个目录建值时指定为POST动作,这样 etcd 会自动在目录下生成一个当前最大的值为键,存储这个新的值(客户端编号)。同时还可以使用 API 按顺序列出所有当前目录下的键值。此时这些键的值就是客户端的时序,而这些键中存储的值可以是代表客户端的编号。

etcd架构

etcd主要分为四个部分:

  • HTTP Server:用于处理用户发送的API请求以及其他etcd节点的同步与心跳信息请求。
  • Store:用于处理etcd支持的各类功能的事务,包括数据索引、节点状态变更、监控与反馈、事件的处理与执行等等,是etcd对用户提供的大多数API的具体实现。
  • Raft:强一致性算法
  • WAL:Write Ahead Log(预写式日志),etcd通过WAL对数据进行持久化存储。WAL中,所有的数据提交前都会实现记录日志。Snapshot(快照)是为了方式数据过多而进行的状态快照;Entry表示存储的日志内容。

etcd架构

etcd集群

etcd一般部署集群推荐奇数个节点,推荐的数量为 3、5 或者 7 个节点构成一个集群。

GO语言操作etcd

put和get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)

func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")

defer cli.Close()
// put
// ctx用于控制超时
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "dawn", "zh")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}

// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "dawn")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
// 遍历取到结果
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}

watch

用于监控etcd中的key的变化(创建/更改/删除)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")

// watch,返回一个通道
wChan := cli.Watch(context.Background(), "key")

// 遍历通道,一旦有事件发生,watch会向通道中发送数据
for wresp := range wChan {
for _, evt := range wresp.Events {
fmt.Printf("type:%s key:%s value:%s\n", evt.Type, evt.Kv.Key, evt.Kv.Value)
}
}
}

lease租约

keepAlive

基于etcd实现分布式锁

介绍

go-ini官方网站

ini文件

ini格式的文件是配置文件中常用的格式。

  • 在ini文件中,每个键值对占用一行,中间使用=隔开
  • #开头的为注释
  • ini文件是以分区为组织的,分区以[name]作为标志
1
2
3
4
5
6
7
8
9
[mysql]
ip = 127.0.0.1
port = 3306
user = root
password = 123456
database = test

[kafka]
address = 127.0.0.1:9092

go-ini

go-ini为GO语言提供了读写ini文件的功能。如使用go-ini读取上面的配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"github.com/go-ini/ini"
)

func main() {
// 加载配置文件
cfg, err := ini.Load("conf.ini")
if err != nil {
fmt.Println("Load config file failed, err:", err)
}

// 获取配置文件信息

// [mysql]分区
mysqlSection := cfg.Section("mysql")
mysqlIP := mysqlSection.Key("ip").String()
fmt.Println("mysql ip =", mysqlIP)
mysqlPort, err := mysqlSection.Key("port").Int()
if err != nil {
fmt.Println("mysql port parse failed, err:", err)
}
fmt.Println("mysql port =", mysqlPort)

// [kafka]分区
kafkaAddress := cfg.Section("kafka").Key("address").String()
fmt.Println("kafka address=", kafkaAddress)
}

使用go-ini读取配置文件的步骤如下:

  • 调用ini.Load加载配置文件,得到配置对象cfg
  • 调用cfg.Section获取分区对象,使用Key方法得到对应配置项的对象
  • Key对象需根据类型调用对应的方法返回具体类型,可以使用StringIntFloat64等方法。

配置文件中存储的都是字符串,所以类型为字符串的配置项不会出现类型转换失败的,故String方法只返回一个值。但是IntUint等方法可能会转换失败,返回一个值和一个错误。

如果每次取值都需要进行错误判断,那么代码写起来会非常繁琐。go-ini为此提供了对应的MustType方法(Type为Int/Uint/Float64等),这个方法只返回一个值。同时它接收可变参数,如果类型无法转换,取参数中第一个值返回,并且该参数设置为这个配置的值,下次调用返回这个值。

go-ini使用

分区操作

获取信息

在加载配置之后

  • 通过Sections方法获取所有分区
  • SectionStrings()方法获取所有的分区名
  • Section(name)获取名为name的分区,如果该分区不存在,则自动创建一个分区并返回
  • NewSection(name)手动创建一个新分区,如果分区已存在,则返回错误

父子分区

可以在分区名中使用.表示两个或多个分区之间的父子关系。

例如package.sub的父分区为packagepackage的父分区为默认分区。如果某个键在子分区中不存在,则会在它的父分区中再次查找,直到没有父分区为止。

保存配置

可以将生成的配置对象写入到文件中,保存有两种类型的接口,一种直接保存到文件,另一种写入到io.Writer中:

1
2
3
4
5
err = cfg.SaveTo("my.ini")
err = cfg.SaveToIndent("my.ini", "\t") // 有缩进

cfg.WriteTo(writer)
cfg.WriteToIndent(writer, "\t") // 有缩进

*Indent方法会对分区下的键进行缩进。

分区与结构体字段映射

定义结构变量,加载完配置文件后,调用MapTo将配置项赋值到结构变量的对应字段中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type Config struct {
MySQL MySQLConfig `ini:"mysql"`
Kafka KafkaConfig `ini:"kafka"`
}

type MySQLConfig struct {
IP string `ini:"ip"`
Port int `ini:"port"`
User string `ini:"user"`
Password string `ini:"password"`
Database string `ini:"database"`
}

type KafkaConfig struct {
Address string `ini:"address"`
}

func main() {
cfg, err := ini.Load("conf.ini")
if err != nil {
fmt.Println("Load config file failed, err:", err)
}

// 映射整个配置文件
c := &Config{}
cfg.MapTo(c)
fmt.Println(c)

// 只映射mysql分区
mysql := &MySQLConfig{}
cfg.Section("mysql").MapTo(mysql)
fmt.Println(mysql)
}

也可以通过结构体生成配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func NewIni() {
cfg := ini.Empty()
// 构造配置信息
c := Config{
MySQL: MySQLConfig{
IP: "192.168.1.2",
Port: 3306,
User: "Dawn",
Password: "123456",
Database: "dbname",
},
Kafka: KafkaConfig{
Address: "127.0.0.1:9092",
},
}

// 通过结构体生成配置
err := ini.ReflectFrom(cfg, &c)
if err != nil {
fmt.Println("ReflectFrom failed: ", err)
return
}

// 保存到文件
err = cfg.SaveTo("new-conf.ini")
if err != nil {
fmt.Println("SaveTo failed: ", err)
return
}

}

项目介绍

解析配置文件,进行日志收集

首先,初步编写一个logagent

  1. 从配置文件中读取信息。
  2. 根据从配置文件中得到的Kafka地址,初始化Kafka生产者:
    • 生产者配置
    • 连接Kafka
    • 初始化消息管道,并且起一个协程,用于从消息管道中读取数据并向Kafka推送消息
  3. 根据从配置文件中得到的日志路径,初始化一个tail.Tail对象TailObj
  4. TailObj从日志中读取数据,封装成msg送入消息管道。协程从消息管道中取出数据并发送给Kafka

读取配置文件

配置文件conf/config.ini

1
2
3
4
5
6
7
8
9
10
11
[kafka]
# Kafka地址
address = 127.0.0.1:9092
# 主题
topic = web_log
# 消息管道大小
chan_size = 1000

[collect]
# 日志文件存放路径
logfile_path = F:\gopath\src\logagent\logs\1.log

读取配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 读配置文件
cfg, err := ini.Load("conf/config.ini")
if err != nil {
logrus.Error("Load config failed, err:", err)
return
}

var configObj = new(Config)
err = cfg.MapTo(configObj)
if err != nil {
logrus.Error("Map failed, err:", err)
return
}

初始化Kafka生产者

kafka/kafka.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package kafka

import (
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
)

// kafka相关操作

var (
Client sarama.SyncProducer
MsgChan chan *sarama.ProducerMessage
)

func Init(address []string, chanSize int) (err error) {
// 生产者配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 分区
config.Producer.Return.Successes = true // 确认

// 连接kafka
Client, err = sarama.NewSyncProducer(address, config)
if err != nil {
logrus.Error("kafka: producer closed, err:", err)
}
// 初始化消息管道
MsgChan = make(chan *sarama.ProducerMessage, chanSize)
// 起一个协程向kafka发送数据
go sendMsg()
return
}

// 从MsgChan中读取msg,发送给kafka
func sendMsg() {
for {
select {
case msg := <-MsgChan:
pid, offser, err := Client.SendMessage(msg)
if err != nil {
logrus.Warn("send msg failed, err:", err)
return
}
logrus.Infof("send msg to kafka success. pid=%v offset=%v", pid, offser)
}
}
}

初始化tail.Tail对象

tailfile.tailfile.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package tailfile

import (
"github.com/hpcloud/tail"
"github.com/sirupsen/logrus"
"os"
)

var (
TailObj *tail.Tail
)

func Init(fileName string) (err error) {
TailObj, err = tail.TailFile(fileName, tail.Config{
Location: &tail.SeekInfo{ // 从文件的哪个地方开始读取
Offset: 0,
Whence: os.SEEK_END,
},
ReOpen: true, // 重新打开文件
MustExist: false, // 文件不存在不报错
Follow: true, // 进行跟随
Poll: true,
})
if err != nil {
logrus.Errorf("tailfile: create tailObj for path %s failed, err: %v\n", fileName, err)
}
return
}

TailObj从日志中读取数据,送入消息管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 从TailObj中从log中读取数据,封装成msg放入到通道中
func run() {
for {
// 循环读数据
line, ok := <-tailfile.TailObj.Lines
if !ok {
logrus.Warnf("tail file close reopen, filename: %s\n", tailfile.TailObj.Filename)
}
// 把从tail中读到的日志,包装成kafka的msg类型
fmt.Println(line.Text)
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder(line.Text)
// 放到通道中
kafka.MsgChan <- msg
}
}

总结:

所以main.go中的流程为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main

import (
"fmt"
"github.com/Shopify/sarama"
"github.com/go-ini/ini"
"github.com/sirupsen/logrus"
"logagent/kafka"
"logagent/tailfile"
)

// 日志收集的客户端
// 收集指定目录下的日志文件,发送到kafka中

type Config struct {
KafkaConfig `ini:"kafka"`
CollectConfig `ini:"collect"`
}

type KafkaConfig struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
ChanSize int `int:"chan_size"`
}
type CollectConfig struct {
LogFilePath string `ini:"logfile_path"`
}

// 真正业务逻辑
// 从TailObj中从log中读取数据,封装成msg放入到通道中
func run() {
for {
// 循环读数据
line, ok := <-tailfile.TailObj.Lines
if !ok {
logrus.Warnf("tail file close reopen, filename: %s\n", tailfile.TailObj.Filename)
}
// 把从tail中读到的日志,包装成kafka的msg类型
fmt.Println(line.Text)
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder(line.Text)
// 放到通道中
kafka.MsgChan <- msg
}
}

func main() {
// 读配置文件
cfg, err := ini.Load("conf/config.ini")
if err != nil {
logrus.Error("Load config failed, err:", err)
return
}

var configObj = new(Config)
err = cfg.MapTo(configObj)
if err != nil {
logrus.Error("Map failed, err:", err)
return
}
fmt.Println(configObj)

// 初始化Kafka
err = kafka.Init([]string{configObj.KafkaConfig.Address}, configObj.KafkaConfig.ChanSize)
if err != nil {
logrus.Error("init kafka failed, err:", err)
return
}
logrus.Info("init kafka success!")

// 根据配置文件中的日志路径使用tail收集日志,初始化tail
err = tailfile.Init(configObj.CollectConfig.LogFilePath)
if err != nil {
logrus.Error("init tail failed, err:", err)
return
}
logrus.Info("init tail success!")

// 把日志发往kafka
// 从TailObj中从log中读取数据,封装成msg放入到通道中
run()
}

概述

tail包用于输出文件的最后几行。假设该档案有更新,tail会自己主动刷新,确保你看到最新的档案内容 ,在日志收集中可以实时的监测日志的变化。

介绍

1
func TailFile(filename string, config Config) (*Tail, error)

tail2.TailFile()函数的参数是文件路径配置文件,会生成一个Tail结构体。在Tail结构体中,最重要的属性是文件名Filename和用于存储文件一行Line的通道Lines

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Tail struct {
Filename string
Lines chan *Line
Config

file *os.File
reader *bufio.Reader

watcher watch.FileWatcher
changes *watch.FileChanges

tomb.Tomb // provides: Done, Kill, Dying

lk sync.Mutex
}

其中表示文件中一行的结构体为Line,这个结构体用于存储读取的一行信息:

1
2
3
4
5
type Line struct {
Text string
Time time.Time
Err error // Error from tail
}
阅读全文 »