简介

Channel 是 Go 的杀手特性之一,并且和 goroutine 配合使 Go 的并发编程变得十分简单,降低并发编程的难度。

如果说 goroutine 是 Go 程序的并发体的话,那么 Channel 则是它们之间的通信机制。

Go 不支持创建系统线程,所以 goroutine 是一个 Go 程序内部唯一的并发实现方式。

在Go中,每个计算都是一个协程。所以我们常用协程来表示计算。

并发

  • 并发,若干计算可能在某些时间片段内同时运行的情形。(一个人同时吃三个馒头)
  • 并行,多个计算在任何时间点都在同时运行。并行计算是一种特殊的并发计算。(三个人同时吃三个馒头)
  • 数据竞争(data race): 不同的并发可能共享一些资源,其中共享内存资源最为常见,但是经常会发生一些数据完整性问题
    • 在一个计算向一段内存写数据的时候,另一个计算从此内存段读数据,结果导致读出的数据的完整性得不到保证。
    • 在一个计算向一段内存写数据的时候,另一个计算也向此段内存写数据,结果导致被写入的数据的完整性得不到保证。
  • 并发同步(也称数据同步): 控制若干并发计算对资源的访问时段,以使数据竞争的情况不会发生。
    • 避免在协程之间产生数据竞争的现象
    • 避免协程无所事事的时候消耗 CPU 资源

并发与并行
并发与并行

并发编程主要任务

  • 调度不同计算,避免数据竞争的发生
  • 决定需要开启多少计算
  • 决定何时开启、阻塞、解除阻塞和结束哪些计算;
  • 决定如何在不同的计算中分担工作负载。

Go 支持的并发同步技术

  • Channel,最常用的方式,应用范围更广
  • sync 标准库中提供的并发同步技术
  • sync/atomic 提供的原子操作技术

Go中大多数的操作都是未同步的,它们都不是并发安全的。 这些操作包括赋值、传参、和各种容器值操作等。 Go中只有少数操作是并发安全的,

Channel

Do not communicate by sharing memory; instead, share memory by communicating.

不要让计算通过共享内存来通讯,而应该让它们通过通讯来共享内存

通过共享内存来通讯通过通讯来共享内存是并发编程中的两种编程风格。 当通过共享内存来通讯的时候,我们需要一些传统的并发同步技术(比如互斥锁)来避免数据竞争。Go 通过 Channel 实现通过通讯来共享内存。

如果说 goroutine 是 Go 程序的并发体的话,那么 Channel 则是它们之间的通信机制。可以把 Channel 抽象为 FIFO 的数据队列,一个 Channel 是一个通信机制,它可以让一个 goroutine 通过它给另一个 goroutine 发送值信息。

随着数据的发送和接收,一些数值的所有权也从一个协程转义到了另一个协程。 当一个协程发送一个值到一个 Channel ,我们可以认为此协程释放了一些值的所有权。 当一个协程从一个 Channel 接收到一个值,我们可以认为此协程获取了一些值的所有权。

每个 Channel 都有一个特殊的类型,也就是 Channel 可发送数据的类型。一个可以发送 int 类型数据的 channel 一般写为 chan int

  • Channel 的零值为 nil,一个非零的 Channel 需要使用 make 函数创建
  • 两个相同类型的 Channel 可以使用 == 运算符比较,如果两个 Channel 引用的是相同的对象,那么比较的结果为 true。
  • 一个 Channel 也可以和 nil 进行比较。

Channel 还有一个容量属性,默认值为0。

  • 一个容量为 0 的 channel 值称为一个非缓冲 channel (unbuffered channel)
  • 一个容量大于 0 的 channel 值称为一个缓冲 channel (buffered channel)
// 非缓冲
ch := make(chan int)
// 非缓冲,与上面等价
ch := make(chan int, 0)

// 缓冲,容量为 1
ch := make(chan int, 1)

和 map 类似,Channel 也对应一个 make 创建的底层数据结构的引用。当我们复制一个 Channel 或用于函数参数传递时,我们只是拷贝了一个 Channel 引用,因此调用者和被调用者将引用同一个 Channel 对象。和其它的引用类型一样,channel的零值也是nil。

Channel 定义如下,<- 指明方向,如果没有指明方向,则 Channel 为双向,既可以接收也可以发送数据。

ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType .
  • chan T 双向 channel 类型。 既可以接收也可以发送数据
  • chan<- T 只发送 T 数据类型的 Channel,只能发送不能接收
  • <-chan T 只接收 T 数据类型的 Channel,只能接收不能发送

从 Channel 接收还是向 Channel 发送

<- 总是优先和最左边的类型结合(The <- operator associates with the leftmost chan possible)

chan<- chan int    // 等价 chan<- (chan int)
chan<- <-chan int  // 等价 chan<- (<-chan int)
<-chan <-chan int  // 等价 <-chan (<-chan int)
chan (<-chan int)

双向 Channel 可以被隐式转换为单向 Channel 类型 chan<- T<-chan T,但反之不行(即使显式也不行)。 类型 chan<- T<-chan T 的值也不能相互转换。

Channel 基本操作

  • 通过内置函数 close 来关闭一个 Channel
close(ch)

传给 close 函数调用的实参必须为一个 channel 值,对一个只接收的 Channel 调用 close 将是一个编译错误。

invalid argument: c (variable of type <-chan int) must not be a receive-only channel

  • 向 Channel 发送一个数据
ch <- v

v 的类型必须和 Channel 类型一致,<- 称为数据发送操作符

  • 从 Channel 接收一个值
<-ch
// 或者
a := <-ch

// 第一个接收到的值是否是在 channel 被关闭前发送的
a, ok := <-ch

如果一个 Channel 操作不永久阻塞,它总会返回至少一个值,此值的类型为 ch 的元素类型。 <- 称为数据接收操作符。

在大多数场合下,一个数据接收操作可以被认为是一个单值表达式。 但是,当一个数据接收操作被用做一个赋值语句中的唯一的源值的时候,它可以返回第二个可选的布尔值返回值从而成为一个多值表达式。 布尔值表示接收到的值是否是在 channel 被关闭前发送的

  • 查询 Channel 的容量
cap(ch)

cap的返回值的类型为int

  • 查询 Channel 的长度
len(ch)

len 的返回值的类型为int。 一个 channel 的长度是指当前有多少个已被发送到此 channel 但还未被接收出去的元素值(即 channel 中剩余元素值的数量)。

如果被查询的 channel 为一个 nil 零值 channel ,则 cap 和 len 函数调用都返回 0

func TestChannl01(t *testing.T) {

	// ch 是零值 nil
	var ch chan int
	fmt.Println(len(ch), cap(ch))

	// 初始化 channel,容量为 3
	ch = make(chan int, 3)
	fmt.Println(len(ch), cap(ch))

	// 向 channel 发送一个数据
	ch <- 1
	fmt.Println(len(ch), cap(ch))

	// output
	// 0 0
	// 0 3
	// 1 3
}

channel 存在的 3 种状态

  • nil: 未初始化的状态(零值),只进行了声明,或者手动赋值为 nil
  • active: 正常的 channel(非零值且尚未关闭),可读可写
  • closed: 已经关闭的 channel(非零值单关闭状态)

channel 可进行的 3 中操作

  • 读: <- ch
  • 写: ch <-
  • 关闭: close(ch)

所以会出现 9 种组合情况

操作未初始化 channel正常 channel关闭 channel
<- ch阻塞成功或者阻塞读到零值(永不阻塞)
ch <-阻塞成功或者阻塞panic
close(ch)panic成功panic

文字描述为:

  • 关闭一个为 nil 的 channel 或者 closed 的 channel 则会 panic
  • 向一个 closed 的 channel 发送数据则会 panic
  • 向一个 nil 的 channel 或者从一个 nil 的 channel 接收数据则将使当前协程永久阻塞

注意: 对一个已经被close过的channel进行接收操作依然可以接受到之前已经成功发送的数据;如果channel中已经没有数据的话将产生一个零值的数据。

func TestChannl02(t *testing.T) {

	ch := make(chan int, 3)

	ch <- 1

	// 从 closed 的 channel 中读取
	// 下面的 ok 表示第一个接收到的值是否是在 channel 被关闭前发送的
	close(ch)

	// 依旧可以读取到之前成功发送的数据
	a, ok := <-ch
	fmt.Println(a, ok)

	// 此时 channel 中已经没有数据产生(可以通过 len(ch) 显示知晓)
	// 则将产生一个数据类型的零值
	// ok1 为 false 说明不是之前的数据
	a1, ok1 := <-ch
	fmt.Println(a1, ok1)

	// output
	// 	1 true
	// 0 false
}

Range

range 也适用于 channel 。 此循环将不断地尝试从一个 channel 接收数据,直到此 channel 关闭并且它的缓冲队列中为空为止,否则程序会一直阻塞在 range。 和应用于 数组/slice/map 的 range 语法不同,应用于 channel 的 range 语法中最多只能出现一个循环变量,此循环变量用来存储接收到的值。

for v := range ch {
	fmt.Println(v)
}

等价于

for {
	v, ok := <-ch
	if !ok {
		break
	}
	fmt.Println(v)
}

如果它是一个nil零值,则此 range 循环将使当前协程永久阻塞。

Select

select-case 是专门为 channel 设计的分支流程控制语法,和 switch-case 分支流程相似。都可以有若干 case 分支和最多一个 default 分支。但是也有很多不同点,在 select-case 中

  • select 关键字和 “{” 不允许存在任何表达式和语句
  • fallthrough 语句不能使用
  • 每个 case 关键字后必须跟随一个接收 channel 或者发送 channel
  • 如果有多个同时满足条件的 case 则会随机选择其中一个执行
  • 在所有的case操作均为阻塞的情况下
    • 如果 default 分支存在,则default分支代码块将得到执行
    • 没有 default 分支,则会进入到阻塞状态

一个不含任何分支的select-case代码块 select{} 将使当前协程处于永久阻塞状态。

下面代码,两个 case 都不会阻塞,所以会被 select 随机选择执行,所以会有 50% 的概率 panic

func TestChannelSwitch01(t *testing.T) {

	c := make(chan struct{})
	close(c)

	select {
	case c <- struct{}{}: // 若此分支被选中,则 panic
	case <-c:
	}
}

注意事项

channel 的元素值的传递都是复制过程

在一个值被从一个 goroutine 传递到另一个 goroutine 的过程中,此值将被复制至少一次。 如果此传递值曾经在某个 channel 的缓冲队列中停留过,则它在此传递过程中将被复制两次。一次复制发生在从发送 goroutine 向缓冲队列推入此值的时候,另一个复制发生在接收 goroutine 从缓冲队列取出此值的时候。 和赋值以及函数调用传参一样,当一个值被传递时,只有它的直接部分被复制。

对于官方标准编译器,最大支持的 channel 的元素类型的尺寸为 65535(未验证,会通过源码进行验证)。 但是,一般说来,为了在数据传递过程中避免过大的复制成本,我们不应该使用尺寸很大的 channel 元素类型。 如果欲传送的值的尺寸较大,应该改用指针类型做为 channel 的元素类型。

关于 channel 和 goroutine 的垃圾回收

注意,一个 channel 被其发送数据 goroutine 队列和接收数据 goroutine 队列中的所有 goroutine 引用着。因此,如果一个 channel 的这两个队列只要有一个不为空,则此 channel 肯定不会被垃圾回收。 另一方面,如果一个 goroutine 处于一个 channel 的某个 goroutine 队列之中,则此 goroutine 也肯定不会被垃圾回收,即使此 channel 仅被此 goroutine 所引用。 事实上,一个 goroutine 只有退出后才能被垃圾回收。

使用场景

把 channel 用在数据流动的地方

Golang并发核心思路是关注数据流动。数据流动的过程交给 channel,数据处理的每个环节都交给 goroutine ,把这些流程画起来,有始有终形成一条线,那就能构成流水线模型。

  • 使用 channel 进行异步和并发编程是简单和惬意的;
  • channel 同步技术比被很多其它语言采用的其它同步方案(比如角色模型async/await模式)有着更多的应用场景和更多的使用变种。
  • channel 并不是 Go 支持的唯一同步技术,并且 channel 并不是在任何情况下都是最佳的同步技术。 以后会介绍sync/atomicsync

1. 使用 for range 读 channel

  • 场景: 当需要不断从channel读取数据时
  • 原理: 使用 for-range 读取 channel ,这样既安全又便利,当 channel 关闭时,for 循环会自动退出,无需主动监测 channel 是否关闭,可以防止读取已经关闭的 channel,造成读到数据为通道所存储的数据类型的零值。
func TestChannelEx01(t *testing.T) {

	ch := make(chan int, 3)
	ch <- 1
	ch <- 2
	ch <- 3

	close(ch)

	for x := range ch {
		fmt.Println(x)
	}

	// output
	// 1
	// 2
	// 3
}

2. 使用 _, ok 判断channel是否关闭

ok 表示接收到的值是否是在 channel 被关闭前发送的

  • 场景: 读channel,但不确定 channel 是否关闭时
  • 原理: 读已关闭的channel会得到零值,如果不确定channel,需要使用ok进行检测。:
    • true: 只说明读到的值是 channel 是关闭之前发送的,不能证明 channel 是关闭。
    • false: channel 关闭,读到的值为零值。
func TestChannelEx02(t *testing.T) {

	ch := make(chan int, 3)
	ch <- 1
	ch <- 2

	v, ok := <-ch
	fmt.Println(v, ok)

	close(ch)

	fmt.Println("len:", len(ch))
	v1, ok1 := <-ch
	// ok1 为 true,只说明读取的是之前在 channel 中的
	// 不能说明 channel 是关闭状态
	fmt.Println(v1, ok1)

	fmt.Println("len:", len(ch))
	v2, ok2 := <-ch
	// ok2 为 false ,说明读取的不是之前发送的
	// 说明 channel 关闭
	fmt.Println(v2, ok2)

	// output
	// 1 true
	// len: 1
	// 2 true
	// len: 0
	// 0 false
}

3. 使用select处理多个channel

  • 场景: 需要对多个通道进行同时处理,但只处理最先发生的 channel 时
  • 原理: select 可以同时监控多个通道的情况,只处理未阻塞的 case。当通道为nil时,对应的case永远为阻塞,无论读写。

特殊关注:普通情况下,对nil的通道写操作是要panic的。

func TestChannelEx03(t *testing.T) {

	t1 := time.Now()

	// 零值,读取和发送都会永久阻塞
	var chNil chan int

	ch := make(chan int, 1)

	go func() {
		time.Sleep(500 * time.Millisecond)
		ch <- 1
	}()

	select {
	case a := <-ch:
		fmt.Println(a)
	case <-chNil:
		fmt.Println("会永久阻塞")
	}

	fmt.Println(time.Since(t1))

	// output
	// 1
	// 500.716326ms
}

4. 使用channel的声明控制读写权限

  • 场景:避免其他 goroutine 向通道写入或者读取
  • 目的:
    • 使代码更易读、更易维护
    • 防止只读 goroutine 对通道进行写数据,但通道已关闭,造成panic。
  • 用法:
    • 如果 goroutine 对某个channel只有写操作,则这个 channel 声明为只写。
    • 如果 goroutine 对某个channel只有读操作,则这个 channel 声明为只读。
func TestChannelEx04(t *testing.T) {

	// 只有 generator 进行对 outCh 进行写操作,返回声明
	// <-chan int,可以防止其他协程乱用此通道,造成隐藏bug
	var generator = func(n int) <-chan int {
		outCh := make(chan int)
		go func() {
			for i := 0; i < n; i++ {
				outCh <- i
			}
			close(outCh)
		}()
		return outCh
	}

	// consumer 只读 inCh 的数据,声明为<-chan int
	// 可以防止它向 inCh 写数据
	var consumer = func(inCh <-chan int) {
		for x := range inCh {
			fmt.Println(x)
		}
	}

	c := generator(5)
	consumer(c)

	// output
	// 0
	// 1
	// 2
	// 3
	// 4
}

5. 使用缓冲channel增强并发

  • 场景:并发
  • 原理:使用有缓冲 channel 作为简单的队列使用,可供多个协程同时处理。

这个例子一般,没有体现队列的效果,但是处理 sync.WaitGroup 还是可以学习一下

func TestChannelEx05(t *testing.T) {

	var generator = func(n int) <-chan int {
		outCh := make(chan int)
		go func() {
			for i := 0; i < n; i++ {
				outCh <- i
			}
			close(outCh)
		}()
		return outCh
	}

	var do = func(inCh <-chan int, outCh chan<- int, wg *sync.WaitGroup) {
		for v := range inCh {
			outCh <- v * v
		}

		wg.Done()
	}

	inCh := generator(100)
	outCh := make(chan int, 10)

	// 使用5个`do`协程同时处理输入数据
	var wg sync.WaitGroup
	wg.Add(5)
	for i := 0; i < 5; i++ {
		go do(inCh, outCh, &wg)
	}

	go func() {
		wg.Wait()
		close(outCh)
	}()

	for r := range outCh {
		fmt.Println(r)
	}

}

6. 超时控制

  • 场景: 需要超时控制的操作
  • 原理: 使用 select 和 time.After,看操作和定时器哪个先返回,处理先完成的,就达到了超时控制的效果

注意 time.After 的坑

func TestChannelEx06(t *testing.T) {

	var ch chan struct{}

	tout := time.After(500 * time.Millisecond)

	select {
	case <-ch:
		fmt.Println("read ch")
	case <-tout:
		fmt.Println("timeout")
	}
	fmt.Println("over")

	// output
	// timeout
	// over
}

7. 使用time实现channel无阻塞读写

  • 场景: 并不希望在channel的读写上浪费时间
  • 原理: 是为操作加上超时的扩展,这里的操作是channel的读或写

time.After等待可以替换为default,则是channel阻塞时,立即返回的效果

func unBlockRead(ch chan int) (x int, err error) {
    select {
    case x = <-ch:
        return x, nil
    case <-time.After(time.Microsecond):
        return 0, errors.New("read time out")
    }
}

func unBlockWrite(ch chan int, x int) (err error) {
    select {
    case ch <- x:
        return nil
    case <-time.After(time.Microsecond):
        return errors.New("read time out")
    }
}

8. 使用close(ch)关闭所有下游协程

  • 场景: 退出时,显示通知所有协程退出
  • 原理: 所有读 ch 的协程都会收到 close(ch) 的信号
func TestChannelEx08(t *testing.T) {

	var p = struct {
		stop chan struct{}
	}{
		stop: make(chan struct{}),
	}

	go func() {
		time.Sleep(2 * time.Second)
		close(p.stop)
	}()

	for {
		select {
		case <-p.stop:
			fmt.Println("stop loop")
			return
		default:
			fmt.Println("biu...")
			time.Sleep(500 * time.Millisecond)
		}
	}

	// output
	// biu...
	// biu...
	// biu...
	// biu...
	// stop loop
}

9. 使用channel传递channel

  • 场景: 使用场景有点多,通常是用来获取结果。
  • 原理: channel可以用来传递变量,channel自身也是变量,可以传递自己。
func TestChannelEx09(t *testing.T) {

	var handle = func(wg *sync.WaitGroup, a int) chan int {
		out := make(chan int)
		go func() {
			time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
			out <- a
			wg.Done()
		}()
		return out
	}

	reqs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}

	// 存放结果的channel的channel
	outs := make(chan chan int, len(reqs))
	var wg sync.WaitGroup
	wg.Add(len(reqs))
	for _, x := range reqs {
		o := handle(&wg, x)
		outs <- o
	}

	go func() {
		wg.Wait()
		close(outs)
	}()

	// 读取结果,结果有序
	for o := range outs {
		fmt.Println(<-o)
	}
}

一个实战

: 并非本人的实战,是根据 golang-first-class-function#Demo 整理,这个 Demo 讲的示例非常好,很有学习的价值。Demo 源代码

前提

在一个 P2P 网络中

  • 一个节点可以是服务器也可以是客户端,被称为 Host
  • 和本节点连接的所有节点都被称为 Peer

Host 需要保存所有建立连接的 Peer,并对这些 Peer 维护(增加、删除、查询和向所有 Peer 广播消息)

Peer 的定义, 通过 Peer 中的 ID 表示全网中的节点,其他信息省略。Peer 有 WriteMsg 方法,实现向该 Peer 发送消息的功能。

// Peer 代表1个节点
type Peer struct {
	ID string
	// Peer其他信息,比如网络连接、地址、协议类型等等
}

func (p *Peer) WriteMsg(msg string) {
	fmt.Printf("send to: %v, msg: %v\n", p.ID, msg)
}

Host 有4个方法,分别是:

  • AddPeer: 增加1个Peer。
  • RemovePeer: 删除1个Peer。
  • GetPeer: 通过Peer.ID查询1个Peer。
  • BroadcastMsg: 向所有Peer发送消息。

版本一

源码

只看部分代码,这就是传统的通过共享内存来通讯,通过互斥锁进行并发安全。具体来讲,每个goroutine都是1个实体,它们同时运行,调用Host的不同方法来访问peers,只有拿到当前lock的goroutine才能访问peers,仿佛当前goroutine在同其他goroutine讲:我现在有访问权,你们等一下。本质上就是,通过共享Host.lock这块内存,各goroutine进行交流(表明自己拥有访问权)。

// Host 代表当前节点的连接管理
type Host struct {
	peers map[string]*Peer // 连接上的所有Peer,根据Peer.ID索引
	lock  sync.RWMutex     // 保护peers互斥访问
	// 其他字段省略
}

func NewHost() *Host {
	h := &Host{
		peers: make(map[string]*Peer),
	}
	return h
}

func (h *Host) AddPeer(p *Peer) {
	h.lock.Lock()
	defer h.lock.Unlock()

	h.peers[p.ID] = p
}

func (h *Host) RemovePeer(pid string) {
	h.lock.Lock()
	defer h.lock.Unlock()

	delete(h.peers, pid)
}

// GetPeer 当前Peer不存在时返回nil。
func (h *Host) GetPeer(pid string) *Peer {
	h.lock.RLock()
	defer h.lock.RUnlock()

	return h.peers[pid]
}

func (h *Host) BroadcastMsg(msg string) {
	h.lock.RLock()
	defer h.lock.RUnlock()

	for _, p := range h.peers {
		p.WriteMsg(msg)
	}
}

版本二

技巧,把数据流动画出来,并把要流动的数据标上,然后那些数据流动的线条,就是 channel,线条上的数据就是 channel 要传递的数据。

源码

通过通讯来共享内存,使用数据的流动,实现并发。

  • Start()用于启动1个goroutine运行loop(),loop保存所有的peers。
  • Stop()用于关闭Host,让loop退出。

loop 方法是一个总的处理方法,一直处于运行状态,并根据 Host.stop 来控制其停止。

以 AddPeer 为例,当调用 AddPeer 方法时,向 Host.add 中添加数据,loop 中通过 select 判断出 Host.add 可以读取则会执行 peers[p.ID] = p,其他方法如出一辙。

其他方法都是单向,但是 GetPeer 需要使用 2 个channel,理想结果是使用一个固定的通道,向发起 GetPeer 的 goroutine 发送结果。其实可以通过 channel 传递 channel,增加一个 query channel,query channel 里传递 Peer.ID 和接收结果的 channel。

// Host 代表当前节点的连接管理
type Host struct {
	add       chan *Peer
	broadcast chan string
	remove    chan string
	stop      chan struct{}
}

func NewHost() *Host {
	h := &Host{
		add:       make(chan *Peer),
		broadcast: make(chan string),
		remove:    make(chan string),
		stop:      make(chan struct{}),
	}
	return h
}

func (h *Host) Start() {
	go h.loop()
}

func (h *Host) Stop() {
	close(h.stop)
}

func (h *Host) loop() {
	peers := make(map[string]*Peer)

	for {
		select {
		case p := <-h.add:
			peers[p.ID] = p
		case pid := <-h.remove:
			delete(peers, pid)
		case msg := <-h.broadcast:
			for _, p := range peers {
				p.WriteMsg(msg)
			}
		case <-h.stop:
			return
		}
	}
}

func (h *Host) AddPeer(p *Peer) {
	h.add <- p
}

func (h *Host) RemovePeer(pid string) {
	h.remove <- pid
}

func (h *Host) BroadcastMsg(msg string) {
	h.broadcast <- msg
}

// GetPeer 当前Peer不存在时返回nil。
func (h *Host) GetPeer(pid string) *Peer {
	// 只有这3个channel无法实现
	return nil
}

版本三

可以把函数作为变量对待

源码

只需要传递 Host.opCh 即可,loop 中的 case 也可以精简,具体代码为准。

type Operation func(peers map[string]*Peer)

// Host 代表当前节点的连接管理
type Host struct {
	opCh chan Operation
	stop chan struct{}
}

func NewHost() *Host {
	h := &Host{
		opCh: make(chan Operation),
		stop: make(chan struct{}),
	}
	return h
}

func (h *Host) Start() {
	go h.loop()
}

func (h *Host) Stop() {
	close(h.stop)
}

func (h *Host) loop() {
	peers := make(map[string]*Peer)

	for {
		select {
		case op := <-h.opCh:
			op(peers)
		case <-h.stop:
			return
		}
	}
}

func (h *Host) AddPeer(p *Peer) {
	add := func(peers map[string]*Peer) {
		peers[p.ID] = p
	}
	h.opCh <- add
}

func (h *Host) RemovePeer(pid string) {
	rm := func(peers map[string]*Peer) {
		delete(peers, pid)
	}
	h.opCh <- rm
}

func (h *Host) BroadcastMsg(msg string) {
	broadcast := func(peers map[string]*Peer) {
		for _, p := range peers {
			p.WriteMsg(msg)
		}
	}

	h.opCh <- broadcast
}

// GetPeer 当前Peer不存在时返回nil。
func (h *Host) GetPeer(pid string) *Peer {
	retCh := make(chan *Peer)
	query := func(peers map[string]*Peer) {
		retCh <- peers[pid]
	}

	// 发送查询
	go func() {
		h.opCh <- query
	}()

	// 等待查询结果并返回
	return <-retCh
}

// SendTo 只向某一个Peer发送消息
func (h *Host) SendTo(pid, msg string) {
	p := h.GetPeer(pid)
	p.WriteMsg(msg)
}

// NumOfPeers peer的数量
func (h *Host) NumOfPeers() int {
	retCh := make(chan int)
	query := func(peers map[string]*Peer) {
		retCh <- len(peers)
	}

	go func() {
		h.opCh <- query
	}()

	return <-retCh
}

对比

  • 版本一,使用互斥锁共享数据,使用锁进行通信来获取数据的使用权
  • 版本二,使用channel 传递数据,存在瑕疵,没有实现 GetPeer
  • 版本三,使用channel 传递函数,不能对 Peer 进行同时读,不容易做单元测试

channel 关闭原则

不要在数据接收方或者在有多个发送者的情况下关闭 channel

如果发送者是该通道的唯一发送者,我们应该只在发送者一侧的goroutine中关闭通道。通用的原则是不要关闭已关闭的 channel 。

粗鲁的关闭 channel

如果由于某种原因,你一定非要从数据接收方或者让众多发送者中的一个关闭一个 channel ,你可以使用 recover 来防止可能产生的 panic 而导致程序崩溃。

假设 channel 的元素类型为T

func SafeClose(ch chan T) (justClosed bool) {
	defer func() {
		if recover() != nil {
			// 一个函数的返回结果可以在defer调用中修改。
			justClosed = false
		}
	}()

	// 假设ch != nil。
	close(ch)   // 如果ch已关闭,则产生一个恐慌。
	return true // <=> justClosed = true; return
}

同样的方法可以用来粗鲁地向一个关闭状态未知的 channel 发送数据。

func SafeSend(ch chan T, value T) (closed bool) {
	defer func() {
		if recover() != nil {
			closed = true
		}
	}()

	ch <- value  // 如果ch已关闭,则产生一个恐慌。
	return false // <=> closed = false; return
}

这样的粗鲁方法不仅违反了 channel 关闭原则,而且标准编译器不保证它的实现中不存在数据竞争

礼貌的关闭 channel

很多 Go 程序员喜欢使用 sync.Once 来关闭 channel 。

type MyChannel struct {
	C    chan T
	once sync.Once
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.once.Do(func() {
		close(mc.C)
	})
}

也可以使用 sync.Mutex 来防止多次关闭一个 channel

type MyChannel struct {
	C      chan T
	closed bool
	mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.mutex.Lock()
	defer mc.mutex.Unlock()
	if !mc.closed {
		close(mc.C)
		mc.closed = true
	}
}

func (mc *MyChannel) IsClosed() bool {
	mc.mutex.Lock()
	defer mc.mutex.Unlock()
	return mc.closed
}

优雅地关闭 channel

下面展示在各种情形下使用纯 channel 操作来关闭 channel 的方法。

为了演示程序的完整性,下面这些例子中使用到了sync.WaitGroup。在实践中,sync.WaitGroup并不是必需的

M个接收者和一个发送者

M个接收者和一个发送者,发送者通过关闭 channel 来传递发送结束信号

package main

import (
	"log"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 100

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int, 100)

	// 发送者
	go func() {
		for {
			if value := rand.Intn(Max); value == 0 {
				// 此唯一的发送者可以安全地关闭此 channel 。
				close(dataCh)
				return
			} else {
				dataCh <- value
			}
		}
	}()

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func() {
			defer wgReceivers.Done()

			// 接收数据直到通道dataCh已关闭
			// 并且dataCh的缓冲队列已空。
			for value := range dataCh {
				log.Println(value)
			}
		}()
	}

	wgReceivers.Wait()
}

一个接收者和N个发送者

一个接收者和N个发送者,此唯一接收者通过关闭一个额外的信号通道来通知发送者不要在发送数据了

添加一个 停止通知 接收端告诉发送端不要发送了。

此情形比上一种情形复杂一些。我们不能让接收者关闭用来传输数据的通道来停止数据传输,因为这样做违反了 channel 关闭原则。 但是我们可以让接收者关闭一个额外的信号 channel 来通知发送者不要在发送数据了。

package main

import (
	"log"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(1)

	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
	// stopCh是一个额外的信号通道。它的
	// 发送者为dataCh channel 的接收者。
	// 它的接收者为dataCh channel 的发送者。

	// 发送者
	for i := 0; i < NumSenders; i++ {
		go func() {
			for {
				// 这里的第一个尝试接收用来让此发送者
				// 协程尽早地退出。对于这个特定的例子,
				// 此select代码块并非必需。
				select {
				case <-stopCh:
					return
				default:
				}

				// 即使stopCh已经关闭,此第二个select
				// 代码块中的第一个分支仍很有可能在若干个
				// 循环步内依然不会被选中。如果这是不可接受
				// 的,则上面的第一个select代码块是必需的。
				select {
				case <-stopCh:
					return
				case dataCh <- rand.Intn(Max):
				}
			}
		}()
	}

	// 接收者
	go func() {
		defer wgReceivers.Done()

		for value := range dataCh {
			if value == Max-1 {
				// 此唯一的接收者同时也是stopCh 的唯一发送者。
				// 尽管它不能安全地关闭dataCh数
				// 据通道,但它可以安全地关闭stopCh 。
				close(stopCh)
				return
			}

			log.Println(value)
		}
	}()

	// ...
	wgReceivers.Wait()
}

当 close(stopCh) 后,第二个 select 的两个 case 也许会同时满足,导致随机执行 case,所以协程会一直进行 dataCh <- rand.Intn(Max) 而不退出,所以需要第一个 select 进行过滤。

如此例中的注释所述,对于此额外的信号通道 stopCh,它只有一个发送者,即dataCh的唯一接收者。 dataCh的接收者关闭了 stopCh,这是不违反 channel 关闭原则的。

在此例中, dataCh并没有被关闭。当一个 channel 不再被任何协程所使用后,它将逐渐被垃圾回收掉,无论它是否已经被关闭。 所以这里的优雅性体现在通过不关闭一个 channel 来停止使用此 channel

M个接收者和N个发送者

它们中的任何协程都可以让一个中间调解协程帮忙发出停止数据传送的信号

这是最复杂的一种情形。我们不能让接收者和发送者中的任何一个关闭 channel ,我们也不能让多个接收者之一关闭一个额外的信号通道。 这两种做法都违反了 channel 关闭原则。 然而,我们可以引入一个中间调解者角色并让其关闭额外的信号通道来通知所有的接收者和发送者结束工作。注意其中使用了一个尝试发送操作来向中间调解者发送信号。

package main

import (
	"log"
	"math/rand"
	"strconv"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 10
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
	// stopCh是一个额外的信号通道。它的发送者为中间调解者。
	// 它的接收者为dataCh的所有的发送者和接收者
	toStop := make(chan string, 1)
	// toStop是一个用来通知中间调解者让其关闭stopCh的 channel。
	// 此channel的发送者为dataCh的所有的发送者和接收者,它的接收者
	// 为中间调解者。它必须为一个缓冲通道。

	var stoppedBy string

	// 中间调解者
	go func() {
		stoppedBy = <-toStop
		close(stopCh)
	}()

	// 发送者
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				if value == 0 {
					// 为了防止阻塞,这里使用了一个尝试
					// 发送操作来向中间调解者发送信号。
					select {
					case toStop <- "发送者#" + id:
					default:
					}
					return
				}

				// 此处的尝试接收操作是为了让此发送协程尽早
				// 退出。标准编译器对尝试接收和尝试发送做了
				// 特殊的优化,因而它们的速度很快。
				select {
				case <-stopCh:
					return
				default:
				}

				// 即使stopCh已关闭,如果这个select代码块
				// 中第二个分支的发送操作是非阻塞的,则第一个
				// 分支仍很有可能在若干个循环步内依然不会被选
				// 中。如果这是不可接受的,则上面的第一个尝试
				// 接收操作代码块是必需的。
				select {
				case <-stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()

			for {
				// 和发送者协程一样,此处的尝试接收操作是为了
				// 让此接收协程尽早退出。
				select {
				case <-stopCh:
					return
				default:
				}

				// 即使stopCh已关闭,如果这个select代码块
				// 中第二个分支的接收操作是非阻塞的,则第一个
				// 分支仍很有可能在若干个循环步内依然不会被选
				// 中。如果这是不可接受的,则上面尝试接收操作
				// 代码块是必需的。
				select {
				case <-stopCh:
					return
				case value := <-dataCh:
					if value == Max-1 {
						// 为了防止阻塞,这里使用了一个尝试
						// 发送操作来向中间调解者发送信号。
						select {
						case toStop <- "接收者#" + id:
						default:
						}
						return
					}

					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}

	// ...
	wgReceivers.Wait()
	log.Println("被" + stoppedBy + "终止了")
}

为什么不像上一个情形,关闭 stopCh 来停止?因为有多个发送者和多个接受者,会存在重复关闭 stopCh 的情况,所以需要增加一个 toStop 来进行协调。当任意一方需要关闭,则只需向 toStop 中添加数据即可,然后触发 stopCh 的关闭,任意一方都会通过 select 中的 case 关闭自己。为了保证 select 的随机性,所以接受者和发送者都添加两次 select。

请注意,toStop的容量必须至少为1。 如果它的容量为0,则在 stopCh 还未准备好的情况下就已经有某个协程向toStop 发送信号时,此信号将被抛弃。(所以会在第二个退出信号才会退出,但是显示第一个退出id?)

我们也可以不使用尝试发送操作向 stopCh 发送信号,但toStop的容量必须至少为数据发送者和数据接收者的数量之和,以防止向其发送数据时(有一个极其微小的可能)导致某些发送者和接收者协程永久阻塞。

...
toStop := make(chan string, NumReceivers + NumSenders)
...
			value := rand.Intn(Max)
			if value == 0 {
				toStop <- "sender#" + id
				return
			}
...
				if value == Max-1 {
					toStop <- "receiver#" + id
					return
				}
...

上述三种情形没有覆盖所有的使用情形,但它们是最基本的几种情形。其它很多情形都可以归为这三种情形。并没有什么情况非得逼得我们违反 channel 关闭原则。 如果你遇到了此情形,请考虑修改你的代码流程和结构设计。

参考链接