第2章 重温并发与并行
# 第2章 重温并发与并行
本章将探讨Go语言并发机制的核心——协程(goroutine)。你将了解它们的工作原理,区分并发和并行,管理当前正在运行的协程,处理数据竞争问题,使用通道(channel)进行通信,以及利用通道状态和信号来充分发挥其潜力。掌握这些概念对于编写高效且无错误的Go代码至关重要。
在本章中,我们将涵盖以下主要内容:
- 理解协程
- 管理数据竞争
- 理解通道
- 交付保证
- 状态与信号
# 理解协程
协程是由Go调度器创建并安排独立运行的函数。Go调度器负责协程的管理和执行。
在底层,有一个复杂的算法来确保协程正常工作。幸运的是,在Go语言中,我们使用go
关键字就能轻松实现这个高度复杂的操作。
注意 如果你习惯使用具有 async/await 特性的语言,可能习惯预先确定要并发使用的函数,需要更改函数签名来表明该函数可以暂停/恢复,调用这个函数也需要特殊的表示法。而使用协程时,无需更改函数签名。 |
---|
在以下代码片段中,有一个main
函数按顺序调用say
函数,并分别传入参数"hello"
和"world"
:
func main() {
say("hello")
say("world")
}
2
3
4
say
函数接收一个字符串作为参数,并进行五次迭代。每次迭代时,函数会休眠500毫秒,然后立即打印参数s
:
func say(s string) {
for i := 1; i < 5; i++ {
time.Sleep(500 * time.Millisecond)
fmt.Println(s)
}
}
2
3
4
5
6
当我们执行这个程序时,它应该输出以下内容:
hello
hello
hello
hello
hello
world
world
world
world
world
2
3
4
5
6
7
8
9
10
现在,我们在第一次调用say
函数之前加上go
关键字,为程序引入并发:
func main() {
go say("hello")
say("world")
}
2
3
4
输出结果应该是hello
和world
交替出现。
那么,如果为第二次函数调用也创建一个协程,能得到相同的结果,对吧?
func main() {
say("hello")
go say("world")
}
2
3
4
现在我们看看这个程序的运行结果:
hello
hello
hello
hello
2
3
4
等等!这里有点不对劲。我们哪里做错了?main
函数和协程似乎不同步。
其实我们并没有做错什么,这是预期的行为。仔细观察第一个程序,协程启动后,第二次对say
函数的调用在main
函数的上下文中顺序执行。
换句话说,程序应该等待函数执行完毕,才能到达main
函数的末尾。而对于第二个程序,情况正好相反。第一次调用是正常的函数调用,所以它按预期打印了五次,但当第二个协程启动时,main
函数中没有后续指令了,所以程序就终止了。
虽然从程序的工作方式来看,这种行为是正确的,但这并非我们想要的。我们需要一种方法,在main
函数有机会终止之前,同步等待这组执行中的所有协程完成。在这种情况下,我们可以利用Go语言的sync
包中的WaitGroup
结构。
# WaitGroup
顾名思义,WaitGroup
是Go标准库中的一种机制,它允许我们等待一组协程,直到它们全部明确完成。
没有特定的工厂函数来创建WaitGroup
,因为它的零值状态就已经是可用的。创建WaitGroup
后,我们需要控制要等待多少个协程。可以使用Add()
方法来告知这个组。
那么如何告知这个组我们已经完成了其中一个协程呢?再直观不过了,我们可以使用Done()
方法来实现。
在下面的示例中,我们引入WaitGroup
,使程序按预期输出消息:
func main() {
wg := sync.WaitGroup{}
wg.Add(2)
go say("world", &wg)
go say("hello", &wg)
wg.Wait()
}
2
3
4
5
6
7
8
我们创建了WaitGroup
(wg := sync.WaitGroup{}
),并声明有两个协程参与这个组(wg.Add(2)
)。
在程序的最后一行,我们使用Wait()
方法显式暂停执行,以避免程序提前终止。
为了让函数与WaitGroup
交互,我们需要传递这个组的引用。一旦有了它的引用,函数可以使用defer
语句调用Done()
,以确保每次函数完成时都能正确地向组发出信号。
这是新的say
函数:
func say(s string, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
fmt.Println(s)
}
}
2
3
4
5
6
我们不再需要依赖time.Sleep()
,所以这个版本中没有它。
现在,我们可以控制协程组了。接下来处理并发编程中一个核心的令人担忧的问题——状态。
# 更改共享状态
想象这样一个场景:在一个繁忙的仓库里,两名勤劳的工人负责将物品装箱。每个工人往箱子里装固定数量的物品,我们必须记录总共装箱的物品数量。
这个看似简单的任务,类似于并发编程,如果处理不当,很快就会变成一场噩梦。如果没有适当的同步机制,工人们可能会无意中互相干扰工作,导致结果错误和不可预测的行为。这是数据竞争(data race)的经典例子,也是并发编程中常见的挑战。
下面的代码将通过一个类比,展示两名仓库工人在往箱子里装物品时遇到的数据竞争问题。我们首先展示没有适当同步的代码,演示数据竞争问题。然后,我们将修改代码来解决这个问题,确保工人们能够顺利且准确地协作。
让我们走进这个繁忙的仓库,亲身体验并发的挑战以及同步在这个例子中的重要性:
package main
import (
"fmt"
"sync"
)
func main() {
fmt.Println("Total Items Packed:", PackItems(0))
}
func PackItems(totalItems int) int {
const workers = 2
const itemsPerWorker = 1000
var wg sync.WaitGroup
itemsPacked := 0
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
// Simulate the worker packing items into boxes.
for j := 0; j < itemsPerWorker; j++ {
itemsPacked = totalItems
// Simulate packing an item.
itemsPacked++
// Update the total items packed without proper
// synchronization.
totalItems = itemsPacked
}
}(i)
}
// Wait for all workers to finish.
wg.Wait()
return totalItems
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
main
函数首先调用PackItems
函数,并传入初始的totalItems
值为0。在PackItems
函数中,定义了两个常量:
workers
:表示工作协程的数量(设置为2)itemsPerWorker
:表示每个工人应装箱的物品数量(设置为1000)
创建一个名为wg
的WaitGroup
,用于在返回最终的totalItems
值之前,等待所有工作协程完成。
一个循环运行workers
次,每次迭代都会启动一个新的协程来模拟一名工人往箱子里装物品。在协程内部,执行以下步骤:
- 一个工人ID作为参数传递给协程。
defer wg.Done()
语句确保协程退出时,WaitGroup
的计数减1。- 用当前的
totalItems
值初始化itemsPacked
变量,用于跟踪该工人装箱的物品数量。 - 一个循环运行
itemsPerWorker
次,模拟装箱过程。不过实际上并没有真正装箱,只是递增itemsPacked
变量。 - 在内层循环的最后一步,
totalItems
获取itemsPacked
变量的修改后的值,该值包含了该工人装箱的物品数量。 - 这里就出现了同步问题。工人试图通过将
itemsPacked
的值加到totalItems
上来更新它。
由于多个协程在没有适当同步的情况下试图并发修改totalItems
,就会发生数据竞争,导致不可预测和错误的结果。
# 不确定的结果
考虑以下这个替代的main
函数:
func main() {
times := 0
for {
times++
counter := PackItems(0)
if counter != 2000 {
log.Fatalf("it should be 2000 but found %d on execution %d", counter, times)
}
}
}
2
3
4
5
6
7
8
9
10
程序会不断运行PackItems
函数,直到得到的结果不是预期的2000。一旦出现这种情况,程序将显示函数返回的错误值以及达到该结果所尝试的次数。
由于Go调度器的不确定性,大多数情况下结果是正确的。这段代码需要运行很多次才能暴露其同步缺陷。
在一次执行中,我进行了超过16000次迭代才出现错误:
it should be 2000 but found 1170 on execution 16421
轮到你了! 在你的机器上运行这段代码并进行实验。你的代码需要多少次迭代才会出错呢? |
---|
如果你在个人电脑上运行,可能同时有许多任务在执行,但你的机器可能还有很多未使用的资源。然而,如果你在云环境中使用容器运行程序,就需要考虑集群中共享节点上的“噪声”量。这里的“噪声”指的是在运行你的程序时,主机上正在进行的其他工作。它可能和你在本地实验时一样空闲,但在追求成本效益、充分利用每一个核心和内存的场景中,它很可能被充分利用。
这种对资源的持续竞争使得调度器更倾向于选择其他工作负载,而不是继续运行我们的协程。
在下面的示例中,我们调用runtime.Gosched
函数来模拟“噪声”。其思路是给Go调度器一个提示,告诉它:“嘿!也许现在是暂停我的好时机”:
for j := 0; j < itemsPerWorker; j++ {
itemsPacked = totalItems
runtime.Gosched() // emulating noise!
itemsPacked++
totalItems = itemsPacked
}
2
3
4
5
6
再次运行main
函数,我们可以看到错误结果出现的频率比之前快得多。例如,在我的执行中,只需要四次迭代:
it should be 2000 but found 1507 on execution 4
不幸的是,代码仍然存在缺陷。我们如何预料到这一点呢?此时,你应该已经猜到Go工具能给出答案,你又猜对了。我们可以在测试中管理数据竞争问题。
# 管理数据竞态
当多个goroutine同时访问共享数据或资源时,可能会出现 “竞态条件”。可以确定的是,这类并发错误会导致不可预测且不理想的行为。Go语言的测试工具内置了一个名为Go竞态检测(Go race detection)的功能,能够检测并识别Go代码中的竞态条件。
那么,让我们创建一个main_test.go
文件,并编写一个简单的测试用例:
package main
import (
"testing"
)
func TestPackItems(t *testing.T) {
totalItems := PackItems(2000)
expectedTotal := 2000
if totalItems != expectedTotal {
t.Errorf("Expected total: %d, Actual total: %d", expectedTotal, totalItems)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
现在,让我们使用竞态检测器:
go test -race
控制台的输出结果可能如下:
==================
WARNING: DATA RACE
Read at 0x00c00000e288 by goroutine 9:
example1.PackItems.func1()
/tmp/main.go:35 +0xa8
example1.PackItems.func2()
/tmp/main.go:45 +0x47
Previous write at 0x00c00000e288 by goroutine 8:
example1.PackItems.func1()
/tmp/main.go:39 +0xba
example1.PackItems.func2()
/tmp/main.go:45 +0x47
// 省略其他行
2
3
4
5
6
7
8
9
10
11
12
13
14
15
乍一看,这个输出可能会让人望而生畏,但最关键的信息是 “WARNING: DATA RACE” 这条消息。
为了修复这段代码中的同步问题,我们应该使用同步机制来保护对totalItems
变量的访问。如果没有适当的同步措施,对共享数据的并发写入可能会导致竞态条件和意外结果。
我们已经使用了sync
包中的WaitGroup
。让我们探索更多的同步机制,以确保程序的正确性。
# 原子操作
令人遗憾的是,Go语言中的 “原子” 这个术语与物理或化学中对原子的实际操作并无关联。要是编程中能有那样的能力,那可就太奇妙了;实际上,Go语言中的原子操作主要是利用sync/atomic
包来实现goroutine之间的同步和并发管理。
Go语言为某些类型(如int32
、int64
、uint32
、uint64
、uintptr
、float32
和float64
)提供了加载、存储、加法和CAS(比较并交换)等原子操作。原子操作不能直接应用于任意数据结构。
让我们使用atomic
包来修改程序。首先,我们需要导入它:
import (
"fmt"
"sync"
"sync/atomic"
)
2
3
4
5
我们不再直接更新totalItems
,而是使用AddInt32
函数来确保同步:
for j := 0; j < itemsPerWorker; j++ {
atomic.AddInt32(&totalItems, int32(itemsPacked))
}
2
3
如果我们再次检查数据竞态,将不会报告任何问题。
当我们需要同步单个操作时,原子结构非常有用,但当我们想要同步一整块代码时,其他工具(如互斥锁)可能更合适。
# 互斥锁
啊,互斥锁!它们就像是goroutine聚会中的保安。想象一下,一群小小的Go “生物” 试图围绕共享数据 “跳舞”。一开始可能还很欢乐,但一旦混乱爆发,就会出现goroutine的 “交通堵塞”,数据到处 “洒落”!
别担心,互斥锁就像舞池管理员一样介入,确保在关键区域同一时间只有一个 “活跃” 的goroutine能够 “舞动”。它们就像是并发的节奏守护者,保证大家轮流进行,不会互相干扰。
你可以通过声明一个sync.Mutex
类型的变量来创建一个互斥锁。互斥锁允许我们使用Lock()
和Unlock()
方法来保护代码的关键部分。当一个goroutine调用Lock()
时,它会获取互斥锁,其他任何试图调用Lock()
的goroutine将被阻塞,直到使用Unlock()
释放锁。
下面是使用互斥锁的程序代码:
package main
import (
"fmt"
"sync"
)
func main() {
m := sync.Mutex{}
fmt.Println("Total Items Packed:", PackItems(&m, 0))
}
func PackItems(m *sync.Mutex, totalItems int) int {
const workers = 2
const itemsPerWorker = 1000
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for j := 0; j < itemsPerWorker; j++ {
m.Lock()
itemsPacked := totalItems
itemsPacked++
totalItems = itemsPacked
m.Unlock()
}
}(i)
}
// 等待所有工作goroutine完成
wg.Wait()
return totalItems
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
在这个例子中,我们锁定了处理共享状态变化的代码块,完成后解锁互斥锁。
如果互斥锁能确保共享状态处理的正确性,你可以考虑两种方式:
- 对每一行关键代码都使用加锁和解锁操作。
- 在函数开头加锁,并使用
defer
延迟解锁。
没错,你确实可以这么做!遗憾的是,这两种方法都有问题。我们会不加区分地引入延迟。为了说明这一点,让我们对第二种方法和最初使用互斥锁的方式进行基准测试。
让我们创建函数的第二个版本MultiplePackItems
,它多次调用加锁/解锁操作。除了函数名和内部循环,其他部分都保持不变。
下面是内部循环的代码:
for j := 0; j < itemsPerWorker; j++ {
m.Lock()
itemsPacked = totalItems
m.Unlock()
m.Lock()
itemsPacked++
m.Unlock()
m.Lock()
totalItems = itemsPacked
m.Unlock()
}
2
3
4
5
6
7
8
9
10
11
让我们看看在基准测试中这两种方式的性能表现:
Benchmark-8 36546 32629 ns/op
BenchmarkMultipleLocks-8 13243 91246 ns/op
2
从每次操作的耗时来看,多次加锁的版本比第一个版本慢了约64%。
基准测试 我们将在第6章 “性能分析” 中详细介绍基准测试和其他性能测量技术。 |
---|
这些示例展示了goroutine独立执行任务,彼此之间没有协作。然而,在许多情况下,我们的任务需要交换信息或信号来做出决策,例如启动或停止一个过程。
当信息交换至关重要时,我们可以使用Go语言中的一个重要工具 —— 通道(channel)。
# 理解通道
欢迎来到 “通道狂欢节”!
想象一下,Go语言的通道就像是神奇的、小丑用的大管道,马戏团演员(goroutine)可以通过它传递杂耍球(数据),并且确保不会有人把球弄掉 —— 真的是字面意义上的 “不掉球”!
# 如何使用通道
要使用通道,我们需要使用内置函数make()
,并指定我们希望通过这个通道传递的数据类型:
make(Chan T)
如果我们想要一个字符串类型的通道,应该这样声明:
make(chan string)
我们还可以指定容量。有容量的通道称为带缓冲通道(buffered channel)。目前我们先不深入讨论容量的细节。当我们不指定容量时,创建的就是无缓冲通道(unbuffered channel)。
# 无缓冲通道
无缓冲通道是多个goroutine之间进行通信的一种方式,它需要遵循一个简单的规则 —— 想要向通道发送数据的goroutine和想要从通道接收数据的goroutine必须同时准备好。
可以把这想象成一个 “信任背摔” 练习。发送者和接收者必须完全信任对方,确保数据的安全,就像杂技演员信任他们的搭档能在半空中接住自己一样。
是不是有点抽象?让我们通过示例来探索这个概念。
首先,让我们向一个没有接收者的通道发送信息:
package main
func main() {
c := make(chan string)
c <- "message"
}
2
3
4
5
6
当我们执行这段代码时,控制台会输出类似以下内容:
fatal error: all goroutines are sleep – dead lock!
goroutine 1 [chan send]:
main.main()
2
3
让我们分析一下这个输出。 “all goroutines are sleep – deadlock!” 是主要的错误信息。它告诉我们程序中的所有goroutine都处于睡眠状态,这意味着它们在等待某个事件或资源可用。然而,由于它们都在等待,无法继续执行,所以程序陷入了死锁状态。 “goroutine 1 [chan send]” 是消息的一部分,它提供了遇到死锁的特定goroutine的额外信息。在这种情况下,是goroutine 1,并且它涉及到通道发送操作(chan send)。
之所以会出现这个死锁,是因为执行被暂停,等待另一个goroutine来接收信息,但实际上并没有这样的goroutine。
死锁 死锁是指两个或多个进程或goroutine由于都在等待永远不会发生的事情而无法继续执行的情况。 |
---|
现在,我们尝试相反的操作;在下一个示例中,我们尝试从一个没有发送者的通道接收数据:
package main
func main() {
c := make(chan string)
fmt.Println(<-c)
}
2
3
4
5
6
控制台的输出非常相似,只是现在错误信息与接收有关:
fatal error: all goroutines are sleep – dead lock!
goroutine 1 [chan receive]:
main.main()
2
3
现在,遵循规则就像同时进行发送和接收一样简单。所以,同时声明发送和接收就足够了吧?
package main
func main() {
c := make(chan string)
c <- "message" // 发送
fmt.Println(<-c) // 接收
}
2
3
4
5
6
7
这听起来是个好主意,但遗憾的是,它行不通,从下面的输出中我们可以看到:
fatal error: all goroutines are sleep – dead lock!
goroutine 1 [chan send]:
main.main()
2
3
如果我们遵循了规则,为什么还不行呢?
实际上,我们并没有完全遵循规则。规则是想要向通道发送数据的goroutine和想要从通道接收数据的goroutine必须同时准备好。需要注意的关键是最后一部分 —— 同时准备好。
由于代码是逐行顺序执行的,当我们尝试执行c <- "message"
时,程序会等待接收者接收消息。我们需要让发送和接收双方同时进行消息的传递。我们可以利用并发编程知识来实现这一点。
让我们用马戏团的比喻,在代码中加入goroutine。我们引入一个函数throwBalls
,它需要接收要抛出的球的颜色(color
)和用于接收这些球的通道(balls
):
package main
import "fmt"
func main() {
balls := make(chan string)
go throwBalls("red", balls)
fmt.Println(<-balls, "received!")
}
func throwBalls(color string, balls chan string) {
fmt.Printf("throwing the %s ball\n", color)
balls <- color
}
2
3
4
5
6
7
8
9
10
11
12
13
14
这里,我们有三个主要步骤:
- 创建一个名为
balls
的无缓冲字符串通道。 - 内联启动一个goroutine,使用
throwBalls
函数向通道发送 “red”。 - 主函数接收并打印从通道接收到的值。这个示例的输出如下:
throwing the red ball
red received!
2
我们成功了!我们使用通道在goroutine之间成功传递了信息!但是当我们再发送一个球时会发生什么呢?让我们用一个绿色的球试试:
func main() {
balls := make(chan string)
go throwBalls("red", balls)
go throwBalls("green", balls)
fmt.Println(<-balls, "received!")
}
2
3
4
5
6
输出显示只接收到了一个球。这是怎么回事呢?
throwing the red ball
red received!
2
红色还是绿色? 由于我们启动了多个goroutine,调度器会随机选择先执行哪个。因此,你可能会随机看到红色或绿色的代码先运行。 |
---|
我们可以通过再添加一个从通道接收数据的打印语句来解决这个问题:
func main() {
balls := make(chan string)
go throwBalls("red", balls)
go throwBalls("green", balls)
fmt.Println(<-balls, "received!")
fmt.Println(<-balls, "received!")
}
2
3
4
5
6
7
虽然这样可行,但这并不是最优雅的解决方案。如果接收者比发送者多,我们可能又会遇到死锁问题:
func main() {
balls := make(chan string)
go throwBalls("red", balls)
go throwBalls("green", balls)
fmt.Println(<-balls, "received!")
fmt.Println(<-balls, "received!")
fmt.Println(<-balls, "received!")
}
2
3
4
5
6
7
8
最后一个打印语句会永远等待,导致另一个死锁。
如果我们希望代码能处理任意数量的球,就不应该再不断添加代码行,而应该用range
关键字来替代。
# 遍历通道
遍历通过通道发送的值所使用的机制是range
关键字。我们修改代码来遍历通道中的值:
func main() {
balls := make(chan string)
go throwBalls("red", balls)
go throwBalls("green", balls)
for color := range balls {
fmt.Println(color, "received!")
}
}
2
3
4
5
6
7
8
我们满心欢喜地查看控制台,想优雅地看到收到的球,但等等——所有的 goroutine 都处于休眠状态!又出现死锁了?
当我们遍历通道时,如果range
期望通道关闭以停止迭代,就会出现这个错误。
# 关闭通道
要关闭通道,我们需要调用内置的close
函数,并传入通道:
close(balls)
好了,现在我们可以确保通道已关闭。我们在发送方和range
之间添加关闭通道的调用,修改代码如下:
go throwBalls("green", balls)
close(balls)
for color := range balls {
2
3
你可能已经注意到,如果通道关闭时range
停止,按照这段代码,一旦通道关闭,range
将永远不会运行。
我们需要协调这组任务,没错,你想得对——我们将再次使用WaitGroup
来解决问题。这一次,我们不想在throwBalls
函数的签名中传入WaitGroup
,所以我们将创建内联匿名函数,让我们的函数无需知晓并发相关的内容。此外,我们希望在确保所有任务完成后关闭通道。我们可以通过WaitGroup
的Wait()
方法来推断任务是否完成。
下面是我们的main
函数:
func main() {
balls := make(chan string)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
throwBalls("red", balls)
}()
go func() {
defer wg.Done()
throwBalls("green", balls)
}()
go func() {
wg.Wait()
close(balls)
}()
for color := range balls {
fmt.Println(color, "received!")
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
呼!这次,输出结果正确显示:
throwing the green ball
green received!
throwing the red ball
red received!
2
3
4
这一路不容易,对吧?但别急!我们还需要探索带缓冲的通道(buffered channels)!
# 带缓冲的通道
是时候打个比方了!
带缓冲的通道中,小丑登场啦!想象一辆小丑车,它的座位数量有限(容量有限)。小丑(发送方)可以上下车,并把杂耍球(数据)放进车里。
我们想编写一个使用带缓冲通道的程序,模拟一次马戏团小丑乘车的场景。在这个场景中,小丑们试图带着气球上小丑车(一次最多坐三个小丑)。司机驾驶汽车并安排小丑乘车,而小丑们则努力上车。如果车满了,小丑们就等待并打印一条消息。在所有小丑都完成乘车后,程序等待司机结束工作,然后打印“马戏团小丑乘车结束”。
要是有小丑试图往车里塞太多杂耍球,那就像车里挤满了小丑和杂耍球一样滑稽,简直就是一场搞笑的闹剧!
首先,我们创建程序结构来处理发送方和接收方:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
clownChannel := make(chan int, 3)
clowns := 5
// senders and receivers logic here!
var wg sync.WaitGroup
wg.Wait()
fmt.Println("Circus car ride is over!")
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
下面是司机的 goroutine(接收方):
go func() {
defer close(clownChannel)
for clownID := range clownChannel {
balloon := fmt.Sprintf("Balloon %d", clownID)
fmt.Printf("Driver: Drove the car with %s inside\n",
balloon)
time.Sleep(time.Millisecond * 500)
fmt.Printf("Driver: Clown finished with %s, the car is ready for more!\n", balloon)
}
}()
2
3
4
5
6
7
8
9
10
我们在司机逻辑代码块下方添加小丑的逻辑(发送方):
for clown := 1; clown <= clowns; clown++ {
wg.Add(1)
go func(clownID int) {
defer wg.Done()
balloon := fmt.Sprintf("Balloon %d", clownID)
fmt.Printf("Clown %d: Hopped into the car with %s\n", clownID,
balloon)
select {
case clownChannel <- clownID:
fmt.Printf("Clown %d: Finished with %s\n", clownID,
balloon)
default:
fmt.Printf("Clown %d: Oops, the car is full, can't fit %s!\n", clownID, balloon)
}
}(clown)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
运行这段代码,我们可以看到小丑们制造的各种状况:
Clown 1: Hopped into the car with Balloon 1
Clown 1: Finished with Balloon 1
Driver: Drove the car with Balloon 1 inside
Clown 2: Hopped into the car with Balloon 2
Clown 2: Finished with Balloon 2
Clown 5: Hopped into the car with Balloon 5
Clown 5: Finished with Balloon 5
Clown 3: Hopped into the car with Balloon 3
Clown 3: Finished with Balloon 3
Clown 4: Hopped into the car with Balloon 4
Clown 4: Oops, the car is full, can't fit Balloon 4!
Circus car ride is over!
2
3
4
5
6
7
8
9
10
11
12
select
语句 select
语句允许我们在多个通信通道上等待,并选择第一个准备好的通道,这实际上使我们能够对通道执行非阻塞操作。
在使用通道时,人们很容易陷入比较消息队列和通道的讨论,但或许有更好的理解方式。通道的内部实现是环形缓冲区(ring buffers),在选择程序设计方案时,这个信息可能会让人困惑且没有帮助。优先理解信号传递和消息的可靠投递,能让你在使用通道时更得心应手。
# 投递保证
带缓冲通道和无缓冲通道的主要区别在于投递保证。
正如我们前面看到的,无缓冲通道始终能保证消息投递,因为它们只有在接收方准备好时才发送消息。相反,带缓冲通道无法确保消息投递,因为在强制进行同步之前,它们可以“缓冲”任意数量的消息。因此,读取方可能无法从通道缓冲区中读取消息。
在两者之间进行选择所带来的最大副作用,是你的程序能够承受多大的延迟。
# 延迟
在并发编程的语境中,延迟(Latency)指的是一段数据从发送方(goroutine)通过通道传输到接收方(goroutine)所花费的时间。
在Go语言的通道中,延迟受几个因素影响:
- 缓冲(Buffering):当发送方和接收方不同步时,缓冲可以减少延迟。
- 阻塞(Blocking):无缓冲通道会阻塞发送方和接收方,直到它们准备好进行通信,这可能会导致更高的延迟。带缓冲通道允许发送方在无需立即同步的情况下继续执行,有可能降低延迟。
- goroutine调度(Goroutine scheduling):通道通信中的延迟还取决于Go运行时(runtime)对goroutine的调度方式。可用CPU核心的数量以及调度算法等因素,都会影响goroutine的执行速度。
# 选择通道类型
一般来说,在以下场景中,无缓冲通道是不错的选择:
- 保证投递:确保发送的值被另一个goroutine接收。在需要保证数据完整性、确保没有数据丢失的场景中,这一点特别有用。
- 一对一通信:无缓冲通道最适合在goroutine之间进行一对一通信。
- 负载均衡:无缓冲通道可用于实现负载均衡模式,确保工作在工作goroutine之间均匀分配。
相反,带缓冲通道有以下优势:
- 异步通信:带缓冲通道允许goroutine之间进行异步通信。在带缓冲通道上发送数据时,如果通道缓冲区有空间,发送方不会阻塞,直到数据被接收。在某些场景中,这可以提高吞吐量。
- 减少竞争:在有多个发送方和接收方的场景中,使用带缓冲通道可以减少竞争。例如,在生产者 - 消费者模式中,可以使用带缓冲通道,让生产者在消费者来不及处理时继续生产。
- 防止死锁:带缓冲通道可以通过一定程度的缓冲来帮助防止goroutine死锁,在工作负载存在不可预测的变化时,这很有用。
- 批处理:带缓冲通道可用于批处理或流水线操作,在这些操作中,数据以一种速率生成,以另一种速率消费。
既然我们已经了解了延迟的关键方面以及它如何影响并发编程中的通道通信,现在让我们把注意力转移到另一个关键方面——状态和信号传递。理解状态和信号传递的语义对于避免常见陷阱、做出明智的设计决策至关重要。
# 状态和信号传递
探索状态和信号传递的语义能让你在避免简单错误、做出良好设计选择方面更具优势。
# 状态
尽管Go语言通过通道简化了并发编程的使用,但仍存在一些特性和陷阱。
我们应该记住,通道有三种状态:零值(nil)、打开(空、非空)和关闭。无论从发送方还是接收方的角度来看,这些状态都与我们对通道的操作密切相关,决定了哪些操作可行,哪些不可行。
考虑读取通道时的情况:
- 向只写通道读取数据会导致编译错误。
- 如果通道为零值,从该通道读取数据会无限期阻塞你的goroutine,直到它被初始化。
- 在打开且为空的通道中读取数据会被阻塞,直到有数据可用。
- 在打开且非空的通道中读取数据会返回数据。
- 如果通道已关闭,读取它会返回其类型的默认值,并返回
false
以表示通道已关闭。
写入通道也有其细微差别:
- 向只读通道写入数据会导致编译错误。
- 向零值通道写入数据会阻塞,直到它被初始化。
- 向打开且已满的通道写入数据会阻塞,直到有空间。
- 在打开且未满的通道中写入数据会成功。
- 向已关闭的通道写入数据会导致运行时恐慌(panic)。
关闭通道取决于其状态:
- 关闭一个有数据的打开通道,在数据被读取完之前,读取操作仍然有效,数据读完后,读取操作返回默认值。
- 关闭一个打开的空通道会立即关闭它,读取操作也返回默认值。
- 尝试关闭一个已经关闭的通道会导致运行时恐慌。
- 关闭一个只读通道会导致编译错误。
# 信号传递
goroutine之间的信号传递是通道的常见应用场景。你可以使用通道在不同的goroutine之间发送信号或消息,以此来协调和同步它们的执行。下面是一个简单的示例,展示如何使用Go语言的通道在两个goroutine之间传递信号:
package main
import (
"fmt"
"sync"
)
func main() {
signalChannel := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Goroutine 1 is waiting for a signal...")
<-signalChannel
fmt.Println("Goroutine 1 received the signal and is now doing something.")
}()
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Goroutine 2 is about to send a signal.")
signalChannel <- true
fmt.Println("Goroutine 2 sent the signal.")
}()
wg.Wait()
fmt.Println("Both goroutines have finished.")
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
在这段代码中,我们创建了一个名为signalChannel
的通道,用于在两个goroutine之间传递信号。goroutine 1使用<-signalChannel
等待通道上的信号,goroutine 2使用signalChannel <- true
发送信号。
sync.WaitGroup
确保我们在打印“Both goroutines have finished.”之前,等待两个goroutine都完成。
当你运行这个程序时,你会看到goroutine 1等待来自goroutine 2的信号,然后再继续执行任务。
Go语言的通道是一种灵活的方式,可用于同步和协调goroutine之间的复杂交互。它们可用于实现并发模式,如生产者 - 消费者模式或扇入/扇出模式。
# 选择同步机制
通道是万能的吗?当然不是!我们可以使用互斥锁(mutexes)或通道来解决同一个问题。那该如何选择呢?实用主义优先。当互斥锁能让你的解决方案易于阅读和维护时,别犹豫,就选互斥锁!
如果你在两者之间难以抉择,这里有一个主观性的指导原则。在需要执行以下操作时,使用通道:
- 传递数据的所有权。
- 分配工作单元。
- 以异步方式传递结果。
在处理以下情况时,使用互斥锁:
- 缓存。
- 共享状态。
好了,让我们结束本章内容,回顾一下我们在本章中所学的知识。
# 总结
在本章中,我们学习了goroutine的工作原理、它们的简洁性,以及使用WaitGroup
进行同步的重要性。我们还了解了管理共享状态的困难,通过仓库的类比来解释数据竞争(data races)。此外,我们还学习了Go语言的竞态检测工具,用于识别竞态条件(race conditions),以及通信通道的重要性和潜在的陷阱。
既然我们已经复习了并发编程的知识,在下一章中,让我们探索如何使用系统调用(system calls)与操作系统进行交互。