Dawn's Blogs

分享技术 记录成长

0%

GO语言杂谈 (8) 操作influxDB

influxDB

influxDB是时间序列数据库,主键是时间戳。

概念

influxDB名词 传统数据库概念
database 数据库
measurement 数据表
point 数据行

数据格式

1
measurement [,tag_key1=tag_value1...] field_key=field_value[,field_key2=field_value2] [timestamp]
  • measurement:类似于数据表的概念
  • field-key,field-value:用来存储数据支持各种类型,数据存储时不会进行索引。每条数据必须拥有一个field-key,如果使用field-key作为过滤条件则需要遍历所有的数据
  • tag-key,tag-value:与field类似,不过会进行索引,方便查询时用于过滤条件

measurements, tag keys, field keys,tag values 全局存一份;field values 和 timestamps 每条数据存一份

Point

influxDB中的point相当于传统数据库里的一行数据,由时间戳(time)、数据(field)、标签(tag)组成。

Point属性 传统数据库概念
time 每个数据记录时间,是数据库中的主索引
field 各种记录值(没有索引的属性),例如温度、湿度
tags 各种有索引的属性,例如地区、海拔

Series

measurement、tag set、retention policy相同的数据集合,称为一个series。同一个series的数据,在物理上会按照时间顺序排列存储在一起。

series的key为measurement + 所有tags的序列化字符串

Retention Policy

保留策略RP,包括数据保存的时间以及在集群中的副本个数

默认的保留策略为,保存时间无限制、副本个数为1

Shard

shard与RP相关联,每一个存储策略下会存在许多shard,每一个shard存储一个指定时间段内的数据,并且不会重复。

shard保存数据的始阶段计算函数如下:

1
2
3
4
5
6
7
8
func shardGroupDuration(d time.Duration) time.Duration {
if d >= 180*24*time.Hour || d == 0 { // 6 months or 0
return 7 * 24 * time.Hour
} else if d >= 2*24*time.Hour { // 2 days
return 1 * 24 * time.Hour
}
return 1 * time.Hour
}

每一个shard都对应底层的一个TSM存储引擎,这样做的目的就是为了可以通过时间来快速定位到要查询数据的相关资源,加速查询的过程,并且也让之后的批量删除数据的操作变得非常简单且高效。

GO语言操作influxDB

连接数据库

1
2
3
4
5
6
7
8
9
10
11
12
// 连接influxDB
func connInflux() client.Client {
cli, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://127.0.0.1:8086",
Username: "admin",
Password: "",
})
if err != nil {
log.Fatalln("connect influxDB failed, err:", err)
}
return cli
}

查询数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 依据command语句查询
func queryDB(cli client.Client, command string) (res []client.Result, err error) {
q := client.Query{
Command: command,
Database: "test",
}

if response, err := cli.Query(q); err == nil {
if response.Error() != nil {
err = response.Error()
} else {
res = response.Results
}
}
return
}

插入数据

插入数据示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func writesPoints(cli client.Client) {
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "test",
Precision: "s", //精度,默认ns
})
if err != nil {
log.Fatal(err)
}
tags := map[string]string{"cpu": "ih-cpu"}
fields := map[string]interface{}{
"idle": 201.1,
"system": 43.3,
"user": 86.6,
}

pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)
err = cli.Write(bp)
if err != nil {
log.Fatal(err)
}
log.Println("insert success")
}