Dawn's Blogs

分享技术 记录成长

0%

Go语言高性能编程 (4) 并发编程——关闭通道

channel 的三种状态和操作结果:

操作 空值(nil) 非空已关闭 非空未关闭
关闭 panic panic 成功关闭
发送数据 永久阻塞 panic 阻塞或成功发送
接收数据 永久阻塞 永不阻塞 阻塞或者成功接收

在 select 中,如果其中的全部通道都被关闭,则一定不会执行 defaul 而是随机选择其中一个被关闭的 case 去执行。

关闭通道

通道关闭原则:

一个常用的使用 Go 通道的原则是不要在数据接收方或者在有多个发送者的情况下关闭通道。换句话说,我们只应该让一个通道唯一的发送者关闭此通道

关闭 channel 的方法有以下几种方法。

粗鲁的方法

如果关闭已经关闭的通道,则会触发 panic,可以用 recover 使得程序回复正常

这种方法违反了通道关闭原则,因为这种方法使得多个协程都可以关闭通道。

1
2
3
4
5
6
7
8
9
10
11
12
func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
// 一个函数的返回结果可以在defer调用中修改。
justClosed = false
}
}()

// 假设ch != nil。
close(ch) // 如果 ch 已关闭,将 panic
return true // <=> justClosed = true; return
}

礼貌的方式

使用 sync.Once 或者互斥锁来保证 channel 只被关闭一次。

1
2
3
4
5
6
7
8
9
10
type MyChannel struct {
C chan T
once sync.Once
}

func (mc *MyChannel) SafeClose() {
mc.once.Do(func() {
close(mc.C)
})
}

也可以使用 sync.Mutex 来关闭通道,维护一个布尔值来表示通道是否已经被关闭(需要用 sync.Mutex 保证互斥访问):

1
2
3
4
5
type MyChannel struct {
C chan T
closed bool
mutex sync.Mutex
}

优雅的方式

  • 情形一:M 个接收者和一个发送者,发送者通过关闭用来传输数据的通道来传递发送结束信号。
  • 情形二:一个接收者和 N 个发送者,此唯一接收者通过关闭一个额外的信号通道来通知发送者不要再发送数据了。
  • 情形三:M 个接收者和 N 个发送者,它们中的任何协程都可以让一个中间调解协程帮忙发出停止数据传送的信号。

多个接收者 一个发送者

此时发送者通过关闭用来传输数据的通道来传递发送结束信号。

发送者:

1
2
3
4
5
6
7
8
9
10
11
func sender(dataCh chan int) {
for {
if value := rand.Intn(100); value == 0 {
// 此唯一的发送者可以安全地关闭此数据通道。
close(dataCh)
return
} else {
dataCh <- value
}
}
}

接收者:

1
2
3
4
5
func receiver(dataCh chan int) {
for value := range dataCh {
// ...
}
}

一个接收者 多个发送者

这种情况下,不能让接收者关闭用来传输数据的通道来停止数据传输,因为这样做违反了通道关闭原则。 但是可以让接收者关闭一个额外的信号通道通知发送者不要再发送数据了

发送者:注意有两个 select,在第二个 select 中,在 stopCh 已经关闭时可能迟迟不会选中第一个分支及时结束协程,所以需要第一个 select 尽早的退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func sender(dataCh chan int, stopCh chan struct{}) {
for {
// 尝试让发送者尽早的退出
select {
case <- stopCh:
return
default:
}

select {
case <- stopCh:
return
case dataCh <- rand.Intn(100):
}
}
}

接收者:直接关闭通道即可。

1
2
3
4
5
6
7
8
9
10
11
12
func receiver(dataCh chan int) {
for value := range dataCh {
if value == 0 {
// 此唯一的接收者同时也是stopCh通道的
// 唯一发送者。尽管它不能安全地关闭dataCh数
// 据通道,但它可以安全地关闭stopCh通道。
close(stopCh)
return
}
// ...
}
}

数据通道 dataCh 并没有被关闭。是的,不必关闭它。 当一个通道不再被任何协程所使用后,它将逐渐被垃圾回收掉,无论它是否已经被关闭。

多个接收者 多个发送者

我们不能让接收者和发送者中的任何一个关闭用来传输数据的通道,我们也不能让多个接收者之一关闭一个额外的信号通道。 这两种做法都违反了通道关闭原则。 然而,可以引入一个中间调解者角色让其关闭额外的信号通道来通知所有的接收者和发送者结束工作

管道还可以限制协程并发数量,用带缓冲区的管道实现:

  • 缓冲区的大小就是最大的并发协程数量
  • 新创建协程之前,先尝试在管道中写入一个数据。在结束协程时,从管道中取出一个数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
wg sync.WaitGroup
ch := make(chan struct{}, 3) // 这里为最大并发数量

for i := 0; i < 10; i++ {
ch <- struct{}
wg.Add(1)

go func() {
defer wg.Done()
// ....
time.Sleep(time.Second)
<-ch
}
}

wg.Wait()
}

很多第三方库实现了协程池,可以很方便地用来控制协程的并发数量: