channel 的三种状态和操作结果:
操作 | 空值(nil) | 非空已关闭 | 非空未关闭 |
---|---|---|---|
关闭 | panic | panic | 成功关闭 |
发送数据 | 永久阻塞 | panic | 阻塞或成功发送 |
接收数据 | 永久阻塞 | 永不阻塞 | 阻塞或者成功接收 |
在 select 中,如果其中的全部通道都被关闭,则一定不会执行 defaul 而是随机选择其中一个被关闭的 case 去执行。
关闭通道
通道关闭原则:
一个常用的使用 Go 通道的原则是不要在数据接收方或者在有多个发送者的情况下关闭通道。换句话说,我们只应该让一个通道唯一的发送者关闭此通道。
关闭 channel 的方法有以下几种方法。
粗鲁的方法
如果关闭已经关闭的通道,则会触发 panic,可以用 recover 使得程序回复正常。
这种方法违反了通道关闭原则,因为这种方法使得多个协程都可以关闭通道。
1 | func SafeClose(ch chan T) (justClosed bool) { |
礼貌的方式
使用 sync.Once 或者互斥锁来保证 channel 只被关闭一次。
1 | type MyChannel struct { |
也可以使用 sync.Mutex 来关闭通道,维护一个布尔值来表示通道是否已经被关闭(需要用 sync.Mutex 保证互斥访问):
1 | type MyChannel struct { |
优雅的方式
- 情形一:M 个接收者和一个发送者,发送者通过关闭用来传输数据的通道来传递发送结束信号。
- 情形二:一个接收者和 N 个发送者,此唯一接收者通过关闭一个额外的信号通道来通知发送者不要再发送数据了。
- 情形三:M 个接收者和 N 个发送者,它们中的任何协程都可以让一个中间调解协程帮忙发出停止数据传送的信号。
多个接收者 一个发送者
此时发送者通过关闭用来传输数据的通道来传递发送结束信号。
发送者:
1 | func sender(dataCh chan int) { |
接收者:
1 | func receiver(dataCh chan int) { |
一个接收者 多个发送者
这种情况下,不能让接收者关闭用来传输数据的通道来停止数据传输,因为这样做违反了通道关闭原则。 但是可以让接收者关闭一个额外的信号通道来通知发送者不要再发送数据了。
发送者:注意有两个 select,在第二个 select 中,在 stopCh 已经关闭时可能迟迟不会选中第一个分支及时结束协程,所以需要第一个 select 尽早的退出。
1 | func sender(dataCh chan int, stopCh chan struct{}) { |
接收者:直接关闭通道即可。
1 | func receiver(dataCh chan int) { |
数据通道
dataCh
并没有被关闭。是的,不必关闭它。 当一个通道不再被任何协程所使用后,它将逐渐被垃圾回收掉,无论它是否已经被关闭。
多个接收者 多个发送者
我们不能让接收者和发送者中的任何一个关闭用来传输数据的通道,我们也不能让多个接收者之一关闭一个额外的信号通道。 这两种做法都违反了通道关闭原则。 然而,可以引入一个中间调解者角色并让其关闭额外的信号通道来通知所有的接收者和发送者结束工作。
管道还可以限制协程并发数量,用带缓冲区的管道实现:
- 缓冲区的大小就是最大的并发协程数量。
- 在新创建协程之前,先尝试在管道中写入一个数据。在结束协程时,从管道中取出一个数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 func main() {
wg sync.WaitGroup
ch := make(chan struct{}, 3) // 这里为最大并发数量
for i := 0; i < 10; i++ {
ch <- struct{}
wg.Add(1)
go func() {
defer wg.Done()
// ....
time.Sleep(time.Second)
<-ch
}
}
wg.Wait()
}很多第三方库实现了协程池,可以很方便地用来控制协程的并发数量: