Dawn's Blogs

分享技术 记录成长

0%

GO学习笔记 (6) 实现线程间同步的方法

经过几日的学习后,本人总结出两种可以实现线程间同步的方法:

  • sync.WaitGroup类型
  • 管道channel
    • 关闭管道
    • 带缓冲区的管道
    • 没有缓存区的管道

其中,sync.WaitGroup和带缓冲区的管道适用于多个并行线程简得同步,其他的方法只适用于两个线程的同步

sync.WaitGroup

基本方法

1
2
3
type WaitGroup struct {
// 包含隐藏或非导出字段
}

WaitGroup用于等待一组线程的结束。父线程调用Add方法来设定应等待的线程的数量。每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束

1
func (wg *WaitGroup) Add(delta int)

Add方法向内部计数加上deltadelta可以是负数;如果内部计数器变为0,Wait方法阻塞等待的所有线程都会释放,如果计数器小于0,方法panic。

1
func (wg *WaitGroup) Done()

Done方法减少WaitGroup计数器的值,应在线程的最后执行

1
func (wg *WaitGroup) Wait()

Wait方法阻塞直到WaitGroup计数器减为0

总结

WaitGroup 对象内部有一个计数器,最初从0开始,它有三个方法:Add(), Done(), Wait() 用来控制计数器的数量。

  • Add(n) 把计数器设置为n
  • Done() 每次把计数器-1
  • wait() 会阻塞代码的运行,直到计数器地值减为0

管道

基本方法

关闭管道

当线程完成工作后关闭管道

1
close(ch)

主线程不断循环测试ok,在关闭管道并且管道中没有数据时,ok会为false,即可跳出循环

1
2
3
4
5
6
7
8
// 不断循环测试ok
for {
_, ok := <- ch
if !ok {
// 关闭管道并且已经没有数据,跳出循环继续执行
break
}
}

有缓冲区的管道

使用一个有缓冲区的管道作为进程间同步的方式时,

  • 需要阻塞的位置处尝试从管道中读取数据<-ch,若管道中没有数据会暂时阻塞当前线程。
  • 在另一线程工作完成后,向管道中发送数据ch <- 1,由于管道中有了数据就可以唤醒线程继续执行,实现同步

无缓冲区的管道

使用无缓冲区的管道作为进程同步的方式时,

  • 需要阻塞的位置处尝试向管道中写入数据ch <- 1,因为没有其他读线程,所以当前线程会被暂时阻塞
  • 在另一线程工作完成后,尝试从管道中读取数据<-ch,由于有了读线程,被阻塞的线程被唤醒继续执行,实现同步

实例

利用多个协程求1-80000的所有素数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main

import (
"fmt"
"runtime"
"time"
)

// 向intChan中放入待判断的数据
func putNum(intChan chan int) {
for i := 1; i <= 80000; i++ {
intChan <- i
}

// 关闭管道
close(intChan)
}

// 从intChan中取出待判断的数据
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
for {
num, ok := <-intChan
if !ok {
// intChan中取不到数据了
break
}
if isPrime(num) {
// 是素数,放入素数管道
primeChan <- num
}
}

fmt.Println("有1个求素数的协程已退出....")
// 向退出管道写入数据
exitChan <- true
}

// 判断n是否为素数
func isPrime(n int) bool {
for i := 2; i < n; i++ {
if n%i == 0 {
return false //不是素数
}
}
return true //是素数
}

func main() {

// 开始时间
start := time.Now().Unix()

// 开启判断素数的线程的数量
threadNum := runtime.NumCPU()
// 存放待判断的数据
intChan := make(chan int, 1000)
// 存放结果,即判断出为素数的数据
primeChan := make(chan int, 10000)
// 标识退出的管道
exitChan := make(chan bool, threadNum)

// 开启协程,向intChan中放入待判断的数据
go putNum(intChan)

// 开启多个协程,从intChan中取出待判断的数据
for i := 0; i < threadNum; i++ {
go primeNum(intChan, primeChan, exitChan)
}

// 等待各个协程判断完毕

// 关闭素数管道
go func() {
for i := 0; i < threadNum; i++ {
<-exitChan
}
close(primeChan)
}()

// 遍历素数管道
for {
_, ok := <-primeChan
if !ok {
// 管道已关闭 退出循环
break
}
// fmt.Println(res)
}

// 结束时间
end := time.Now().Unix()

fmt.Printf("总共运行时间=%v\n", end-start)
// 总共运行时间=1
}