Go中的协程

goroutine

每一个并发执行的活动称为goroutine。通过go 函数调用启动一个goroutine:

1
go foo()

也可以(并且很多情况下)结合匿名函数使用:

1
2
3
go func() {
// greatful work
}()

一个常见的陷阱是把for循环里定义的变量给捕捉进去了:

1
2
3
4
5
for i:=0; i<10; i++ {
go func() {
fmt.Println(i) // wrong
}()
}

这段代码里i会随着循环进行而变化。可能i=0的时候开始的goroutine,在i=1的时候才读到i的值并输出,这并不是我们想要的。

正确的写法:

1
2
3
4
5
for i:=0; i<10; i++ {
go func(i int) {
fmt.Println(i) // right
}(i)
}

goroutine没有标识。从外部控制goroutine启动后的行为需要程序员手动实现。

WaitGroup

sync.WaitGroup用于等待一系列goroutine执行完毕。

操作:

  • Add(delta):添加delta个待完成任务(delta也可为负)
  • Done():完成一个任务
  • Wait():等待直到所有任务完成
1
2
3
4
5
6
7
8
9
10
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
time.Sleep(time.Duration(i) * time.Second)
fmt.Println(i)
wg.Done()
}(i)
}
wg.Wait()

通道

通道是一个队列(FIFO数据结构),可以通过通道在不同的goroutine之间传输数据,进行数据通信。

通道的类型是chan 类型。通道是一个引用类型,所以可以直接作为参数传递给函数。

操作

构造通道ch := make(chan 类型, 缓冲大小=1),缓冲大小即通道最多能暂时保留多少个值,默认为1。

接收操作<-ch是一个表达式,取出最先进入通道的值。当通道为空的时候,接受操作会阻塞,直到有值进入为止。

发送操作ch<-value是一个语句,让值进入通道。当通道缓冲区满的时候,发送操作会阻塞,直到有值被取出为止。

关闭通道close(ch)。通道被关闭后,接收操作会马上返回nil,发送操作会引起panic。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ch := make(chan int)

go func(ch chan int) {
for i := 0; i < 10; i++ {
time.Sleep(time.Duration(i) * time.Second)
ch<-i
}
close(ch)
}(ch)

go func(ch chan int) {
for i := 0; i < 10; i++ {
x := <-ch
fmt.Println(x)
}
}(ch)

通道可以通过for data := range ch循环遍历,直到通道被关闭时退出循环。如果通道在使用完之后没有及时关闭,循环将会一直阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ch := make(chan int)

go func(ch chan int) {
for i := 0; i < 10; i++ {
time.Sleep(time.Duration(i) * time.Second)
ch<-i
}
close(ch)
}(ch)

go func(ch chan int) {
for x := range ch {
fmt.Println(x)
}
}(ch)

和文件不同,通道可以不手动关闭。

单向通道

接收通道的类型是 chan<- 类型,发送通道的类型是<-chan 类型,这两种通道统称单向通道。

可以通过赋值操作将双向通道转换成为单向通道,反之则不可。

试图关闭一个接收通道将会出错。

select语句

select语句用于同时等待多个发送/接收操作,并完成最先满足的那一个。如果有多个操作同时可得,则等概率随机选择其中一个执行。

select语法类似switch:

1
2
3
4
5
6
7
8
9
select {
case <-ch1:
// do something 1
case x := <-ch2:
// do something 2
case ch3 <- y:
// do something 3
// ...
}

也可以有default分支,当其余所有操作都不是立即可得时,执行default分支:

1
2
3
4
5
6
7
8
select {
case x := <-ch1:
// do something 1
case ch2 <- y:
// do something 2
default:
// do something 3
}

事件

goroutine没有提供事件类型,因为通道就能完全可以实现事件的功能。接收操作可以用来等待事件,关闭通道操作可以用来分发事件。

技巧:取消goroutine

go默认没有提供任何API用于取消一个goroutine,但是可以自己实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
done := make(chan struct{})
go func() {
// do some heavy work such as loop
for {
select {
case <-done:
break
default:
// do your task
}
}
}()
time.Sleep(5000)
close(done) // cancel

互斥锁(悲观锁):sync.Mutex

互斥锁有两种状态:上锁或未上锁。

调用Lock()申请锁,调用Unlock()释放锁。

如果已经处于上锁状态,则调用Lock()后就会阻塞,直到其处于未上锁状态为止。(不可重入)

互斥锁用于确保程序中的几段程序片段,当其中一段正在执行时,其他的代码段不允许开始执行。通常用于防止一个goroutine操作变量时,另一个goroutine也在操作这个变量。

读写互斥锁(乐观锁):sync.RWMutex

读写互斥锁允许并发的读操作,但是只允许一个goroutine在进行写操作。

独写互斥锁有三种状态:读锁、写锁或未上锁。

调用RLock()申请读锁,调用Lock()申请写锁,调用RUnlock()释放读锁,调用Unlock()释放写锁。

读锁可以重入,但是写锁不可以。且处于读锁时申请写锁仍然会阻塞。

死锁

死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

产生死锁的必要条件:

  1. 互斥条件:指进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源,则请求者只能等待,直至占有资源的进程用毕释放。

  2. 请求和保持条件:指进程已经保持至少一个资源,但又提出了新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持不放。

  3. 不剥夺条件:指进程已获得的资源,在未使用完之前,不能被剥夺,只能在使用完时由自己释放。

  4. 环路等待条件:指在发生死锁时,必然存在一个进程——资源的环形链,即进程集合{P0,P1,P2,···,Pn}中的P0正在等待一个P1占用的资源;P1正在等待P2占用的资源,……,Pn正在等待已被P0占用的资源。

来源:死锁 - 百度百科)

通信顺序进程(CSP)

Do not communicate by sharing memory; instead, share memory by communicating.

“不要以共享内存的方式来通信,相反,要通过通信来共享内存。”

前半句所谓共享内存,即C++等语言中各个进程都可以操纵同一个变量。为了保证操作的原子性,不可避免地要使用锁。后半句说的就是CSP。

既然多个进程同时操纵一个变量会出事,那我只让一个进程操纵不就可以了吗。其他的进程通过和这个进程通信,就能间接地操纵这个变量。这就是CSP的理念。CSP里,一个变量只属于一个进程,进程之间通过一对通信原语(在Go中实现为通道)实现通信。进程只关心消息本身,并不关心消息的发送者。

Go语言同时支持这两种并发模式,但Go官方更推崇使用后者。

例如,一个并发安全的变量读写可以这样实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

type request struct {
req string
data interface{}
ch chan<- interface{}
}

var value int

var reqChan = make(chan request)

func worker(done chan struct{}) {
for r := range reqChan {
switch r.req {
case "get":
r.ch <- value
case "set":
value = r.data.(int)
r.ch <- struct{}{}
}
}
done <- struct{}{}
}

func get() int {
ch := make(chan interface{})
reqChan <- request{"get", nil, ch}
return (<-ch).(int)
}

func set(v int) {
ch := make(chan interface{})
reqChan <- request{"set", v, ch}
<-ch
}

func main() {
done := make(chan struct{})
go worker(done)
<-done
}
Author: ssttkkl
Link: https://ssttkkl.github.io/uncategorized/2020/07/Go%E4%B8%AD%E7%9A%84%E5%8D%8F%E7%A8%8B/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.