Dawn's Blogs

分享技术 记录成长

0%

上一版的改进

问题

因为log agent可以部署在多台机器上,每一台机器上需要收集的日志可能不尽相同,所以需要针对不同机器部署不同的配置文件

改进

因为不同的机器上拥有不同的配置文件,所以etcd中每一台机器对应的配置文件的key应该是唯一可标识的,这样可以区分不同的机器。在key中加入IP地址以区别不同的key,故config.ini做出以下更改:

1
2
3
4
# 原来是collect_conf_key = collect_conf
# 改进版:
# 保存日志收集配置文件的key,根据IP地址(%s)得到key
collect_conf_key = collect_%s_conf

相对应的增加一个获取IP地址的函数,用于获取IP地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
// GetOutboundIP 获取本机的IP地址
func GetOutboundIP() (ip string, err error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return
}
defer conn.Close()

localAddr := conn.LocalAddr().(*net.UDPAddr)
// fmt.Println(localAddr.String())
ip = strings.Split(localAddr.IP.String(), ":")[0]
return
}

main主函数中,首先获取本机的IP地址:

1
2
3
4
5
6
// 获取本机ip
ip, err := common.GetOutboundIP()
if err != nil {
logrus.Error("get local ip failed, err:", err)
return
}

根据配置文件中的collect_conf_key = collect_%s_conf项格式化字符串,替换配置文件结构体中的相应字段:

1
2
// 用ip替换collect_conf_key中的%s
configObj.EtcdConfig.CollectConfKey = fmt.Sprintf(configObj.EtcdConfig.CollectConfKey, ip)

收集系统信息

目前,已经完成了日志收集log agent的构建。

现在来构建用于收集系统日志的sys info agent,需要收集的系统信息包括CPU、主存、磁盘、网络信息,将收集到的系统信息发送给Kafka。基本思路与log agent相同:

  • 首先根据配置文件获取Kafka的相关配置信息
  • 根据配置信息初始化Kafka
    • 连接Kafka
    • 初始化消息管道,这个消息管道用于接收收集到的系统信息
    • 开启协程,从消息管道中取出消息,发送到Kafka
  • 初始化系统信息收集系统,开启线程用于采集数据,每隔1秒收集一次
阅读全文 »

influxDB

influxDB是时间序列数据库,主键是时间戳。

概念

influxDB名词 传统数据库概念
database 数据库
measurement 数据表
point 数据行

数据格式

1
measurement [,tag_key1=tag_value1...] field_key=field_value[,field_key2=field_value2] [timestamp]
  • measurement:类似于数据表的概念
  • field-key,field-value:用来存储数据支持各种类型,数据存储时不会进行索引。每条数据必须拥有一个field-key,如果使用field-key作为过滤条件则需要遍历所有的数据
  • tag-key,tag-value:与field类似,不过会进行索引,方便查询时用于过滤条件

measurements, tag keys, field keys,tag values 全局存一份;field values 和 timestamps 每条数据存一份

Point

influxDB中的point相当于传统数据库里的一行数据,由时间戳(time)、数据(field)、标签(tag)组成。

Point属性 传统数据库概念
time 每个数据记录时间,是数据库中的主索引
field 各种记录值(没有索引的属性),例如温度、湿度
tags 各种有索引的属性,例如地区、海拔

Series

measurement、tag set、retention policy相同的数据集合,称为一个series。同一个series的数据,在物理上会按照时间顺序排列存储在一起。

series的key为measurement + 所有tags的序列化字符串

Retention Policy

保留策略RP,包括数据保存的时间以及在集群中的副本个数

默认的保留策略为,保存时间无限制、副本个数为1

Shard

shard与RP相关联,每一个存储策略下会存在许多shard,每一个shard存储一个指定时间段内的数据,并且不会重复。

shard保存数据的始阶段计算函数如下:

1
2
3
4
5
6
7
8
func shardGroupDuration(d time.Duration) time.Duration {
if d >= 180*24*time.Hour || d == 0 { // 6 months or 0
return 7 * 24 * time.Hour
} else if d >= 2*24*time.Hour { // 2 days
return 1 * 24 * time.Hour
}
return 1 * time.Hour
}

每一个shard都对应底层的一个TSM存储引擎,这样做的目的就是为了可以通过时间来快速定位到要查询数据的相关资源,加速查询的过程,并且也让之后的批量删除数据的操作变得非常简单且高效。

阅读全文 »

上一版本的问题

首先来看上一版本的配置文件:

1
2
3
4
5
6
7
8
9
10
11
[kafka]
# Kafka地址
address = 127.0.0.1:9092
# 主题
topic = web_log
# 消息管道大小
chan_size = 1000

[collect]
# 日志文件存放路径
logfile_path = F:\gopath\src\logagent\logs\1.log

通过配置文件指定需要收集的日志时,主要有两个问题:

  • 日志文件的存放路径只能有一条,也就是说不能同时启动多个tail.Tail对象,来收集多个日志
  • 无法实现对配置文件的实时监控,根据配置文件的更改来做出及时的变化

通过etcd管理日志配置

上述问题的原因在于ini配置文件无法同时定义多个日志文件路径。解决的方法就是通过etcd管理日志收集配置,在etcd中以json的格式存储日志收集配置信息,如下方:

1
2
3
4
5
6
7
8
9
10
[
{
"path":"F:/gopath/src/logagent/logs/web.log",
"topic":"web_log"
},
{
"path":"F:/gopath/src/logagent/logs/blog.log",
"topic":"blog_log"
}
]

相应的,需要修改config.ini配置文件,配置etcd的相关设置:

1
2
3
4
5
6
7
8
9
10
11
12
[kafka]
# Kafka地址
address = 127.0.0.1:9092
# 主题
topic = web_log
# 消息管道大小
chan_size = 1000

[etcd]
address = 127.0.0.1:2379
# 保存日志收集配置文件的key
collect_conf_key = collect_conf
阅读全文 »

etcd

etcd是使用Go语言开发的一个开源的、高可用的分布式key-value存储系统,可以用于配置共享服务的注册和发现

etcd具有以下特点

  • 完全复制:集群中的每个节点都可以使用完整的存档
  • 高可用性:Etcd可用于避免硬件的单点故障或网络问题
  • 一致性:每次读取都会返回跨多主机的最新写入
  • 简单:包括一个定义良好、面向用户的API(gRPC)
  • 安全:实现了带有可选的客户端证书身份验证的自动化TLS
  • 快速:每秒10000次写入的基准速度
  • 可靠:使用Raft算法实现了强一致、高可用的服务存储目录

应用场景

服务发现

在一个分布式集群中,如何找到提供某项服务的主机并与之建立连接,这就是服务发现所完成的功能:

  1. 服务提供者首先在注册中心注册服务
  2. 服务请求者根据所需要的服务在注册中心请求服务提供者的地址信息
  3. 服务请求者与服务提供者建立连接,完成服务

服务发现

配置中心

将一些配置信息放到 etcd 上进行集中管理。

这类场景的使用方式通常是这样:应用在启动的时候主动从 etcd 获取一次配置信息,同时,在 etcd 节点上注册一个 Watcher 并等待,以后每次配置有更新的时候,etcd 都会实时通知订阅者,以此达到获取最新配置信息的目的。

分布式锁

因为 etcd 使用 Raft 算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁。锁服务有两种使用方式,一是保持独占,二是控制时序。

  • 保持独占即所有获取锁的用户最终只有一个可以得到。etcd 为此提供了一套实现分布式锁原子操作 CAS(CompareAndSwap)的 API。通过设置prevExist值,可以保证在多个节点同时去创建某个目录时,只有一个成功。而创建成功的用户就可以认为是获得了锁。
  • 控制时序,即所有想要获得锁的用户都会被安排执行,但是获得锁的顺序也是全局唯一的,同时决定了执行顺序。etcd 为此也提供了一套 API(自动创建有序键),对一个目录建值时指定为POST动作,这样 etcd 会自动在目录下生成一个当前最大的值为键,存储这个新的值(客户端编号)。同时还可以使用 API 按顺序列出所有当前目录下的键值。此时这些键的值就是客户端的时序,而这些键中存储的值可以是代表客户端的编号。

etcd架构

etcd主要分为四个部分:

  • HTTP Server:用于处理用户发送的API请求以及其他etcd节点的同步与心跳信息请求。
  • Store:用于处理etcd支持的各类功能的事务,包括数据索引、节点状态变更、监控与反馈、事件的处理与执行等等,是etcd对用户提供的大多数API的具体实现。
  • Raft:强一致性算法
  • WAL:Write Ahead Log(预写式日志),etcd通过WAL对数据进行持久化存储。WAL中,所有的数据提交前都会实现记录日志。Snapshot(快照)是为了方式数据过多而进行的状态快照;Entry表示存储的日志内容。

etcd架构

etcd集群

etcd一般部署集群推荐奇数个节点,推荐的数量为 3、5 或者 7 个节点构成一个集群。

GO语言操作etcd

put和get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)

func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")

defer cli.Close()
// put
// ctx用于控制超时
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "dawn", "zh")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}

// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "dawn")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
// 遍历取到结果
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}

watch

用于监控etcd中的key的变化(创建/更改/删除)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")

// watch,返回一个通道
wChan := cli.Watch(context.Background(), "key")

// 遍历通道,一旦有事件发生,watch会向通道中发送数据
for wresp := range wChan {
for _, evt := range wresp.Events {
fmt.Printf("type:%s key:%s value:%s\n", evt.Type, evt.Kv.Key, evt.Kv.Value)
}
}
}

lease租约

keepAlive

基于etcd实现分布式锁

介绍

go-ini官方网站

ini文件

ini格式的文件是配置文件中常用的格式。

  • 在ini文件中,每个键值对占用一行,中间使用=隔开
  • #开头的为注释
  • ini文件是以分区为组织的,分区以[name]作为标志
1
2
3
4
5
6
7
8
9
[mysql]
ip = 127.0.0.1
port = 3306
user = root
password = 123456
database = test

[kafka]
address = 127.0.0.1:9092

go-ini

go-ini为GO语言提供了读写ini文件的功能。如使用go-ini读取上面的配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"github.com/go-ini/ini"
)

func main() {
// 加载配置文件
cfg, err := ini.Load("conf.ini")
if err != nil {
fmt.Println("Load config file failed, err:", err)
}

// 获取配置文件信息

// [mysql]分区
mysqlSection := cfg.Section("mysql")
mysqlIP := mysqlSection.Key("ip").String()
fmt.Println("mysql ip =", mysqlIP)
mysqlPort, err := mysqlSection.Key("port").Int()
if err != nil {
fmt.Println("mysql port parse failed, err:", err)
}
fmt.Println("mysql port =", mysqlPort)

// [kafka]分区
kafkaAddress := cfg.Section("kafka").Key("address").String()
fmt.Println("kafka address=", kafkaAddress)
}

使用go-ini读取配置文件的步骤如下:

  • 调用ini.Load加载配置文件,得到配置对象cfg
  • 调用cfg.Section获取分区对象,使用Key方法得到对应配置项的对象
  • Key对象需根据类型调用对应的方法返回具体类型,可以使用StringIntFloat64等方法。

配置文件中存储的都是字符串,所以类型为字符串的配置项不会出现类型转换失败的,故String方法只返回一个值。但是IntUint等方法可能会转换失败,返回一个值和一个错误。

如果每次取值都需要进行错误判断,那么代码写起来会非常繁琐。go-ini为此提供了对应的MustType方法(Type为Int/Uint/Float64等),这个方法只返回一个值。同时它接收可变参数,如果类型无法转换,取参数中第一个值返回,并且该参数设置为这个配置的值,下次调用返回这个值。

go-ini使用

分区操作

获取信息

在加载配置之后

  • 通过Sections方法获取所有分区
  • SectionStrings()方法获取所有的分区名
  • Section(name)获取名为name的分区,如果该分区不存在,则自动创建一个分区并返回
  • NewSection(name)手动创建一个新分区,如果分区已存在,则返回错误

父子分区

可以在分区名中使用.表示两个或多个分区之间的父子关系。

例如package.sub的父分区为packagepackage的父分区为默认分区。如果某个键在子分区中不存在,则会在它的父分区中再次查找,直到没有父分区为止。

保存配置

可以将生成的配置对象写入到文件中,保存有两种类型的接口,一种直接保存到文件,另一种写入到io.Writer中:

1
2
3
4
5
err = cfg.SaveTo("my.ini")
err = cfg.SaveToIndent("my.ini", "\t") // 有缩进

cfg.WriteTo(writer)
cfg.WriteToIndent(writer, "\t") // 有缩进

*Indent方法会对分区下的键进行缩进。

分区与结构体字段映射

定义结构变量,加载完配置文件后,调用MapTo将配置项赋值到结构变量的对应字段中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type Config struct {
MySQL MySQLConfig `ini:"mysql"`
Kafka KafkaConfig `ini:"kafka"`
}

type MySQLConfig struct {
IP string `ini:"ip"`
Port int `ini:"port"`
User string `ini:"user"`
Password string `ini:"password"`
Database string `ini:"database"`
}

type KafkaConfig struct {
Address string `ini:"address"`
}

func main() {
cfg, err := ini.Load("conf.ini")
if err != nil {
fmt.Println("Load config file failed, err:", err)
}

// 映射整个配置文件
c := &Config{}
cfg.MapTo(c)
fmt.Println(c)

// 只映射mysql分区
mysql := &MySQLConfig{}
cfg.Section("mysql").MapTo(mysql)
fmt.Println(mysql)
}

也可以通过结构体生成配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func NewIni() {
cfg := ini.Empty()
// 构造配置信息
c := Config{
MySQL: MySQLConfig{
IP: "192.168.1.2",
Port: 3306,
User: "Dawn",
Password: "123456",
Database: "dbname",
},
Kafka: KafkaConfig{
Address: "127.0.0.1:9092",
},
}

// 通过结构体生成配置
err := ini.ReflectFrom(cfg, &c)
if err != nil {
fmt.Println("ReflectFrom failed: ", err)
return
}

// 保存到文件
err = cfg.SaveTo("new-conf.ini")
if err != nil {
fmt.Println("SaveTo failed: ", err)
return
}

}

项目介绍

解析配置文件,进行日志收集

首先,初步编写一个logagent

  1. 从配置文件中读取信息。
  2. 根据从配置文件中得到的Kafka地址,初始化Kafka生产者:
    • 生产者配置
    • 连接Kafka
    • 初始化消息管道,并且起一个协程,用于从消息管道中读取数据并向Kafka推送消息
  3. 根据从配置文件中得到的日志路径,初始化一个tail.Tail对象TailObj
  4. TailObj从日志中读取数据,封装成msg送入消息管道。协程从消息管道中取出数据并发送给Kafka

读取配置文件

配置文件conf/config.ini

1
2
3
4
5
6
7
8
9
10
11
[kafka]
# Kafka地址
address = 127.0.0.1:9092
# 主题
topic = web_log
# 消息管道大小
chan_size = 1000

[collect]
# 日志文件存放路径
logfile_path = F:\gopath\src\logagent\logs\1.log

读取配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 读配置文件
cfg, err := ini.Load("conf/config.ini")
if err != nil {
logrus.Error("Load config failed, err:", err)
return
}

var configObj = new(Config)
err = cfg.MapTo(configObj)
if err != nil {
logrus.Error("Map failed, err:", err)
return
}

初始化Kafka生产者

kafka/kafka.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package kafka

import (
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
)

// kafka相关操作

var (
Client sarama.SyncProducer
MsgChan chan *sarama.ProducerMessage
)

func Init(address []string, chanSize int) (err error) {
// 生产者配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 分区
config.Producer.Return.Successes = true // 确认

// 连接kafka
Client, err = sarama.NewSyncProducer(address, config)
if err != nil {
logrus.Error("kafka: producer closed, err:", err)
}
// 初始化消息管道
MsgChan = make(chan *sarama.ProducerMessage, chanSize)
// 起一个协程向kafka发送数据
go sendMsg()
return
}

// 从MsgChan中读取msg,发送给kafka
func sendMsg() {
for {
select {
case msg := <-MsgChan:
pid, offser, err := Client.SendMessage(msg)
if err != nil {
logrus.Warn("send msg failed, err:", err)
return
}
logrus.Infof("send msg to kafka success. pid=%v offset=%v", pid, offser)
}
}
}

初始化tail.Tail对象

tailfile.tailfile.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package tailfile

import (
"github.com/hpcloud/tail"
"github.com/sirupsen/logrus"
"os"
)

var (
TailObj *tail.Tail
)

func Init(fileName string) (err error) {
TailObj, err = tail.TailFile(fileName, tail.Config{
Location: &tail.SeekInfo{ // 从文件的哪个地方开始读取
Offset: 0,
Whence: os.SEEK_END,
},
ReOpen: true, // 重新打开文件
MustExist: false, // 文件不存在不报错
Follow: true, // 进行跟随
Poll: true,
})
if err != nil {
logrus.Errorf("tailfile: create tailObj for path %s failed, err: %v\n", fileName, err)
}
return
}

TailObj从日志中读取数据,送入消息管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 从TailObj中从log中读取数据,封装成msg放入到通道中
func run() {
for {
// 循环读数据
line, ok := <-tailfile.TailObj.Lines
if !ok {
logrus.Warnf("tail file close reopen, filename: %s\n", tailfile.TailObj.Filename)
}
// 把从tail中读到的日志,包装成kafka的msg类型
fmt.Println(line.Text)
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder(line.Text)
// 放到通道中
kafka.MsgChan <- msg
}
}

总结:

所以main.go中的流程为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main

import (
"fmt"
"github.com/Shopify/sarama"
"github.com/go-ini/ini"
"github.com/sirupsen/logrus"
"logagent/kafka"
"logagent/tailfile"
)

// 日志收集的客户端
// 收集指定目录下的日志文件,发送到kafka中

type Config struct {
KafkaConfig `ini:"kafka"`
CollectConfig `ini:"collect"`
}

type KafkaConfig struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
ChanSize int `int:"chan_size"`
}
type CollectConfig struct {
LogFilePath string `ini:"logfile_path"`
}

// 真正业务逻辑
// 从TailObj中从log中读取数据,封装成msg放入到通道中
func run() {
for {
// 循环读数据
line, ok := <-tailfile.TailObj.Lines
if !ok {
logrus.Warnf("tail file close reopen, filename: %s\n", tailfile.TailObj.Filename)
}
// 把从tail中读到的日志,包装成kafka的msg类型
fmt.Println(line.Text)
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder(line.Text)
// 放到通道中
kafka.MsgChan <- msg
}
}

func main() {
// 读配置文件
cfg, err := ini.Load("conf/config.ini")
if err != nil {
logrus.Error("Load config failed, err:", err)
return
}

var configObj = new(Config)
err = cfg.MapTo(configObj)
if err != nil {
logrus.Error("Map failed, err:", err)
return
}
fmt.Println(configObj)

// 初始化Kafka
err = kafka.Init([]string{configObj.KafkaConfig.Address}, configObj.KafkaConfig.ChanSize)
if err != nil {
logrus.Error("init kafka failed, err:", err)
return
}
logrus.Info("init kafka success!")

// 根据配置文件中的日志路径使用tail收集日志,初始化tail
err = tailfile.Init(configObj.CollectConfig.LogFilePath)
if err != nil {
logrus.Error("init tail failed, err:", err)
return
}
logrus.Info("init tail success!")

// 把日志发往kafka
// 从TailObj中从log中读取数据,封装成msg放入到通道中
run()
}

概述

tail包用于输出文件的最后几行。假设该档案有更新,tail会自己主动刷新,确保你看到最新的档案内容 ,在日志收集中可以实时的监测日志的变化。

介绍

1
func TailFile(filename string, config Config) (*Tail, error)

tail2.TailFile()函数的参数是文件路径配置文件,会生成一个Tail结构体。在Tail结构体中,最重要的属性是文件名Filename和用于存储文件一行Line的通道Lines

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Tail struct {
Filename string
Lines chan *Line
Config

file *os.File
reader *bufio.Reader

watcher watch.FileWatcher
changes *watch.FileChanges

tomb.Tomb // provides: Done, Kill, Dying

lk sync.Mutex
}

其中表示文件中一行的结构体为Line,这个结构体用于存储读取的一行信息:

1
2
3
4
5
type Line struct {
Text string
Time time.Time
Err error // Error from tail
}
阅读全文 »

消息队列通信模型

队列模式

消息⽣产者⽣产消息发送到队列queue中,然后消息消费者从queue中取出并且消费消息。 ⼀条消息被消费以后,queue中就没有了,不存在重复消费。

发布/订阅

消息⽣产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点⽅式不同,发布到topic的消息会被所有订阅者消费


发布订阅模式下,当发布者消息量很⼤时,显然单个订阅者的处理能⼒是不⾜的。实际上现实场景中是多个订阅者节点组成⼀个订阅组负载均衡消费topic消息即分组订阅,这样订阅者很容易实现消费能⼒线性扩展。可以看成是⼀个topic下有多个queue,每个queue是点对点的⽅式,queue之间是发布订阅⽅式。

Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统。

同时Kafka结合队列模型和发布订阅模型

  • 对于同一个消费者组Consumer Group,一个message只有被一个consumer消费
  • 同一个消息可以被广播到不同的Consumer Group中

架构介绍

Kafka架构

  • Producer:Producer即⽣产者,消息的产⽣者,是消息的⼊⼝。
  • kafka cluster:kafka集群,⼀台或多台服务器组成
    • Broker:Broker是指部署了Kafka实例的服务器节点。每个服务器上有⼀个或多个kafka的实例,我们姑且认为每个broker对应⼀台服务器。每个kafka集群内的broker都有⼀个不重复的编号。
    • Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。实际应⽤中通常是⼀个业务线建⼀个topic。
    • Partition:Topic的分区,每个topic可以有多个分区,分区的作⽤是做负载,提⾼kafka的吞吐量。同⼀个topic在不同的分区的数据是不重复的,partition的表现形式就是⼀个⼀个的⽂件夹。为了提高可靠性,提出分区的副本,分为Leader和Follower,生产和消费只针对Leader。
    • Replication:每⼀个分区都有多个副本,副本的作⽤是做备份。当主分区(Leader)故障的时候会选择⼀个备胎(Follower)上位,成为Leader。在kafka中默认副本的最⼤数量是10个,且副本的数量不能⼤于Broker的数量,follower和leader绝对是在不同的机器,同⼀机器对同⼀个分区也只可能存放⼀个副本(包括⾃⼰)。
  • Consumer:消费者,即消息的消费⽅,是消息的出⼝。
    • Consumer Group:我们可以将多个消费组组成⼀个消费者组,在kafka的设计中同⼀个分区的数据只能被消费者组中的某⼀个消费者消费同⼀个消费者组的消费者可以消费同⼀个topic的不同分区的数据,这也是为了提⾼kafka的吞吐量!
阅读全文 »

概述

context主要用来在goroutine之间传递上下文信息

Go1.7加入了一个新的标准库context,它定义了Context类型,专门用来简化 对于处理单个请求的多个 goroutine 之间与请求域的数据、取消信号、截止时间等相关操作。

可以使用WithCancelWithDeadlineWithTimeoutWithValue创建的派生上下文。当一个上下文被取消时,它派生的所有上下文也被取消。

为什么需要Context?

如何解决通知goroutine退出?

全局变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var wg sync.WaitGroup

// 全局变量的方式
var exit bool

func worker() {
for {
fmt.Println("worker")
time.Sleep(time.Second)
if exit { // 查看全局变量,检测是否退出
break
}
}
wg.Done()
}

func main() {
wg.Add(1)
go worker()
// 如何优雅的实现结束子goroutine
time.Sleep(time.Second * 5) // 防止退出太快
exit = true // 修改全局变量
wg.Wait()
fmt.Println("over")
}

channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
var wg sync.WaitGroup

// channel方式通知goroutine退出

func worker(exitChan <-chan struct{}) {
label:
for {
select {
case <-exitChan:
break label
default:
fmt.Println("worker")
time.Sleep(time.Second)
}
}
wg.Done()
}

func main() {

exitChan := make(chan struct{}) // 定义退出管道

wg.Add(1)
go worker(exitChan)
time.Sleep(time.Second * 5) // 防止退出太快
exitChan <- struct{} // 通知协程退出
wg.Wait()
fmt.Println("over")
}

context - 官方方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
var wg sync.WaitGroup

// context方式通知协程退出

func worker(ctx context.Context) {
label:
for {
select {
case <-ctx.Done():
break label
default:
fmt.Println("worker")
time.Sleep(time.Second)
}
}
wg.Done()
}

func main() {

ctx, cancel := context.WithCancel(context.Background())

wg.Add(1)
go worker(ctx)
time.Sleep(time.Second * 5) // 防止退出太快
cancel() // 调用cancel函数,告诉goroutine退出
wg.Wait()
fmt.Println("over")
}

当goroutine又启动一个goroutine时,只需要将ctx传入即可。通过context库,可以控制子孙协程的退出。

阅读全文 »

复制带随机指针的链表

复制带随机指针的链表

解题思路

我们用哈希表记录每一个节点对应新节点的创建情况。

解题代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Definition for a Node.
* type Node struct {
* Val int
* Next *Node
* Random *Node
* }
*/

func copyRandomList(head *Node) *Node {
cachedNode := map[*Node]*Node{}
var deepCopy func(head *Node) *Node
deepCopy = func(head *Node) *Node {
if head == nil {
return nil
}

if node, ok := cachedNode[head]; ok {
// 已经创建了这个节点
return node
}

// 创建新的节点
newNode := &Node{ Val: head.Val }
// 记录在哈希表中
cachedNode[head] = newNode
newNode.Next = deepCopy(head.Next)
newNode.Random = deepCopy(head.Random)

return newNode
}

return deepCopy(head)
}
阅读全文 »