Dawn's Blogs

分享技术 记录成长

0%

GO语言杂谈 (9) 操作ZooKeeper

基本操作

连接zookeeper

1
2
3
4
5
6
// 连接zookeeper服务器
func initZk() (*zk.Conn, error) {
servers := []string{"127.0.0.1:2181"}
conn, _, err := zk.Connect(servers, time.Second*5)
return conn, err
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
conn zookeeper连接
path 节点
data 节点数据
flags有4种取值:
0:永久,除非手动删除
zk.FlagEphemeral = 1:短暂,session断开则改节点也被删除
zk.FlagSequence = 2:会自动在节点后面添加序号
3:Ephemeral和Sequence,即,短暂且自动添加序号
*/
func add(conn *zk.Conn, path string, data []byte, flags int32) error {
acls := zk.WorldACL(zk.PermAll)
s, err := conn.Create(path, data, flags, acls)
if err != nil {
log.Println("create failed, err:", err)
} else {
log.Println("create success:", s)
}
return err
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 删除
func delete(conn *zk.Conn, path string) error {
// 获取版本
_, stat, err := conn.Get(path)
if err != nil {
log.Println("get version failed, err:", err)
return err
}
// 删除
err = conn.Delete(path, stat.Version)
if err != nil {
log.Printf("delete failed for path %s, err: %v\n", path, err)
}
return err
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 更新节点
func set(conn *zk.Conn, path string, data []byte) error {
// 获取版本
_, stat, err := conn.Get(path)
if err != nil {
log.Println("get version failed, err:", err)
return err
}
// 更新
stat, err = conn.Set(path, data, stat.Version)
if err != nil {
log.Printf("set failed for path %s, err: %v\n", path, err)
} else {
log.Printf("set success ,stat: %v\n", stat)
}
return err
}

1
2
3
4
5
6
7
8
9
10
// 查询
func get(conn *zk.Conn, path string) ([]byte, error) {
b, stat, err := conn.Get(path)
if err != nil {
log.Println("get failed, err:", err)
} else {
log.Printf("info of node for %v: %v\n", path, stat)
}
return b, err
}

Watch机制

只监听一次

调用zk.WithEventCallback(callback)设置回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main

import (
"fmt"
"github.com/samuel/go-zookeeper/zk"
"log"
"time"
)

// zk watch 回调函数
func callback(event zk.Event) {
// zk.EventNodeCreated
// zk.EventNodeDeleted
fmt.Println("###########################")
fmt.Println("path: ", event.Path)
fmt.Println("type: ", event.Type.String())
fmt.Println("state: ", event.State.String())
fmt.Println("---------------------------")
}

func initZk() (*zk.Conn, error) {
eventCallbackOption := zk.WithEventCallback(callback)
servers := []string{"127.0.0.1:2181"}
conn, _, err := zk.Connect(servers, time.Second*5, eventCallbackOption)
return conn, err
}

func add(conn *zk.Conn, path string, data []byte, flags int32) error {
acls := zk.WorldACL(zk.PermAll)
_, err := conn.Create(path, data, flags, acls)

return err
}

// 删除
func delete(conn *zk.Conn, path string) error {
// 获取版本
_, stat, err := conn.Get(path)
if err != nil {
return err
}
// 删除
err = conn.Delete(path, stat.Version)
return err
}

func listenOne(conn *zk.Conn, path string) error {
//调用conn.ExistsW(path) 或GetW(path)为对应节点设置监听,该监听只生效一次
_, _, _, err := conn.ExistsW(path)
return err
}

func main() {
conn, err := initZk()
defer conn.Close()
if err != nil {
fmt.Println(err)
return
}

path := "/dawn111"
data := []byte("hello world")

// 开始监听path,只监听一次
err = listenOne(conn, path)
if err != nil {
log.Println(err)
}
// 触发创建数据操作
err = add(conn, path, data, 0)
if err != nil {
log.Println("create failed, err:", err)
} else {
log.Println("创建数据成功!")
}

//再次监听path
err = listenOne(conn, path)
if err != nil {
log.Println(err)
}
// 触发删除数据操作
err = delete(conn, path)
if err != nil {
log.Printf("delete failed for path %s, err: %v\n", path, err)
} else {
log.Println("删除数据成功!")
}

}

开启一个channel处理

可以开一一个协程来处理chanel中传来的event事件,可以持续监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main

import (
"fmt"
"github.com/samuel/go-zookeeper/zk"
"log"
"time"
)

func initZk() (*zk.Conn, error) {
servers := []string{"127.0.0.1:2181"}
conn, _, err := zk.Connect(servers, time.Second*5)
return conn, err
}

func add(conn *zk.Conn, path string, data []byte, flags int32) error {
acls := zk.WorldACL(zk.PermAll)
_, err := conn.Create(path, data, flags, acls)

return err
}

// 删除
func delete(conn *zk.Conn, path string) error {
// 获取版本
_, stat, err := conn.Get(path)
if err != nil {
return err
}
// 删除
err = conn.Delete(path, stat.Version)
return err
}

func listenOneChannel(conn *zk.Conn, path string) error {
_, _, ch, err := conn.ExistsW(path)

go func() {
event := <-ch
fmt.Println("*******************")
fmt.Println("path:", event.Path)
fmt.Println("type:", event.Type.String())
fmt.Println("state:", event.State.String())
fmt.Println("-------------------")
}()

return err
}

func main() {
conn, err := initZk()
defer conn.Close()
if err != nil {
fmt.Println(err)
return
}

path := "/dawn111"
data := []byte("hello world")

// 开启一个管道持续监听
err = listenOneChannel(conn, path)
if err != nil {
log.Println(err)
}
// 创建数据触发事件
err = add(conn, path, data, 0)
if err != nil {
log.Println("create failed, err:", err)
} else {
log.Println("创建数据成功!")
}
// 删除数据触发事件
err = delete(conn, path)
if err != nil {
log.Printf("delete failed for path %s, err: %v\n", path, err)
} else {
log.Println("删除数据成功!")
}

}