9. 原子内存操作
# 9. 原子内存操作
原子内存操作(Atomic memory operations)为实现其他同步原语提供了必要的底层基础。一般来说,你可以用互斥锁(mutexes)和通道(channels)替换并发算法中的所有原子操作。尽管如此,原子操作仍是有趣且有时令人困惑的概念,你应该了解它们的工作原理。如果你谨慎使用,它们可以成为优化代码的有效工具,且不会增加代码的复杂性。
在本章中,我们将探讨以下主题:
- 原子内存操作的内存保证
- 比较并交换操作
- 原子操作的实际应用,包括计数器、心跳/进度指示器、取消操作以及检测变化
# 技术要求
无。
# 内存保证
为什么我们需要专门的函数来进行原子内存操作呢?如果我们对一个大小小于或等于机器字长(int类型就是按照机器字长定义的)的变量进行写入操作,比如a = 1
,这难道不是原子操作吗?实际上,Go语言的内存模型保证了这种写入操作是原子的;然而,它并不能保证其他goroutine何时能看到该写入操作的效果,甚至不能保证它们一定能看到。让我们试着剖析一下这句话的含义。第一部分的意思很简单,如果你在一个goroutine中对一个与机器字长相同大小(即int类型)的共享内存位置进行写入操作,并在另一个goroutine中读取它,即使存在竞态条件(race condition),你也不会读取到随机值。内存模型保证你只会读取到写入操作之前的值,或者写入操作之后的值(并非所有语言都有这样的保证)。这也意味着,如果写入操作涉及的数据大小大于机器字长,那么一个goroutine在读取这个值时,可能会看到底层对象处于不一致的状态。例如,一个字符串值包含两个部分,一个是指向底层数组的指针,另一个是字符串长度。对这些单个字段的写入操作是原子的,但在存在竞态的情况下读取,可能会读到一个数组指针为nil
但长度不为零的字符串。这句话的第二部分指出,编译器可能会对代码进行优化或重新排序,或者硬件可能会以一种无序的方式执行内存操作,导致另一个goroutine无法在预期的时间看到写入操作的效果。下面这个内存竞态的标准示例可以说明这一点:
func main() {
var str string
var done bool
go func() {
str = "Done!"
done = true
}()
for!done {
}
fmt.Println(str)
}
2
3
4
5
6
7
8
9
10
11
这里存在一个内存竞态,因为str
和done
变量在一个goroutine中被写入,而在另一个goroutine中被读取,且没有进行显式的同步。这个程序可能会有以下几种行为:
- 它可能会打印
Done!
。 - 它可能会打印一个空字符串。这意味着主goroutine看到了对
done
的内存写入,但没有看到对str
的内存写入。 - 程序可能会挂起。这意味着主goroutine没有看到对
done
的内存写入。
这就是原子操作发挥作用的地方。下面这个程序是无竞态的:
func main() {
var str done atomic.Value
var done atomic.Bool
str.Store("")
go func() {
str.Store("Done!")
done.Store(true)
}()
for!done.Load() {
}
fmt.Println(str.Load())
}
2
3
4
5
6
7
8
9
10
11
12
原子操作的内存保证如下:如果一个原子读操作观察到了一个原子写操作的效果,那么该原子写操作发生在这个原子读操作之前。这也保证了下面这个程序要么打印1,要么什么都不打印(它永远不会打印0):
func main() {
var done atomic.Bool
var a int
go func() {
a = 1
done.Store(true)
}()
if done.Load() {
fmt.Println(a)
}
}
2
3
4
5
6
7
8
9
10
11
注意,这里仍然存在竞态条件,但不是内存竞态。根据语句的执行顺序,主goroutine可能会也可能不会将done
视为true
。然而,如果主goroutine将done
视为true
,那么就可以保证a = 1
。
这就是使用原子操作会变得复杂的原因之一——内存顺序保证是有条件的。它们永远不会阻塞正在运行的goroutine,所以你测试一个原子读操作返回的变量值为某个特定值,并不意味着在if
语句体执行时,该变量仍然保持这个值。这就是为什么在使用原子操作时需要格外小心。就像前面的程序一样,使用原子操作很容易陷入竞态条件。请记住——你总是可以不使用原子操作来编写相同的程序。
# 比较并交换
每当你测试一个条件并根据结果采取行动时,都可能会产生竞态条件。例如,下面这个函数尽管使用了原子操作,但并不能实现互斥:
var locked sync.Bool
func wrongCriticalSectionExample() {
if!locked.Load() {
// 此时另一个goroutine可能会锁定它!
locked.Store(true)
defer locked.Store(false)
// 这个goroutine进入临界区,但另一个goroutine也可以
}
}
2
3
4
5
6
7
8
9
10
这个函数首先测试原子变量locked
的值是否为false
。两个goroutine可以同时执行这个测试语句,并且由于都看到locked
为false
,它们都可以进入临界区并将locked
设置为true
。这里需要的是一个包含比较和存储操作的原子操作。这就是比较并交换(Compare-And-Swap,CAS)操作,它正如其名——比较一个变量的值是否为预期值,如果是,则将该值原子地替换为给定的值。如果变量的值与预期值不同,则不进行任何更改——也就是说,一个CAS操作原子地执行以下操作:
if *variable == testValue {
*variable = newValue
return true
}
return false
2
3
4
5
现在,你实际上可以实现一个非阻塞的互斥锁:
func criticalSection() {
if locked.CompareAndSwap(false, true) {
defer locked.Store(false)
// 临界区
}
}
2
3
4
5
6
只有当locked
为false
时,这个函数才会进入临界区。如果是这种情况,它会原子地将locked
设置为true
并进入临界区。否则,它将跳过临界区并继续执行。因此,这个函数实际上可以替代Mutex.TryLock
。
# 原子操作的实际应用
以下是一些使用原子操作的示例。这些是在不同场景中无竞态地使用原子操作的简单示例。
# 计数器
原子操作可以用作高效的、线程安全的计数器。下面这个程序创建了许多goroutine,每个goroutine都会将共享计数器加1。另一个goroutine会不断循环,直到计数器达到10000。由于这里使用了原子操作,这个程序是无竞态的,并且它最终会打印出10000并终止:
var count int64
func main() {
for i := 0; i < 10000; i++ {
go func() {
atomic.AddInt64(&count, 1)
}()
}
for {
v := atomic.LoadInt64(&count)
fmt.Println(v)
if v == 10000 {
break
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 心跳检测与进度指示器
有时,一个协程可能会失去响应,或者无法按预期速度推进工作。心跳检测工具和进度指示器可用于监控这类协程。实现方式有多种,例如,被监控的协程可以使用非阻塞发送来宣告进度,或者通过增加一个由互斥锁保护的共享变量来宣告其进度。原子操作(Atomics)使我们能够在不使用互斥锁的情况下实现共享变量方案。这还有一个好处,即多个协程无需额外同步就能观察到这个共享变量。
那么,让我们定义一个简单的ProgressMeter
类型,其中包含一个原子值:
type ProgressMeter struct {
progress int64
}
2
3
被监控的协程使用以下方法来表明其进度。这个方法只是以原子操作的方式将进度值增加1:
func (pm *ProgressMeter) Progress() {
atomic.AddInt64(&pm.progress, 1)
}
2
3
Get
方法返回当前的进度值。请注意,这里的读取操作是原子的;如果不是,就有可能错过对该变量的原子增加操作:
func (pm *ProgressMeter) Get() int64 {
return atomic.LoadInt64(&pm.progress)
}
2
3
这个实现中的一个重要细节是,Progress()
和Get()
方法都必须是原子操作。假设你还想记录最后一次记录进度的时间戳。你可以添加一个时间戳变量,并使用另一个原子读/写操作:
type WrongProgressMeter struct {
progress int64
timestamp int64
}
func (pm *WrongProgressMeter) Progress() {
atomic.AddInt64(&pm.progress, 1)
atomic.StoreInt64(&pm.timestamp, time.Now().UnixNano())
}
func (pm *WrongProgressMeter) Get() (n int64, ts int64) {
n = atomic.LoadInt64(&pm.progress)
ts = atomic.LoadInt64(&pm.timestamp)
return
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
这个实现可能会读取到更新后的进度值,但时间戳却是旧的。原子操作保证了写操作按写入顺序被观察到,但并不能保证ProgressMeter
更新的原子性。正确的实现应该使用互斥锁(Mutex)来确保原子更新。
现在,让我们编写一个长时间运行的协程,它使用这个进度指示器来宣告其进度。下面这个协程只是休眠120毫秒,然后记录其进度:
func longGoroutine(ctx context.Context, pm *ProgressMeter) {
for {
select {
case <-ctx.Done():
fmt.Println("Canceled")
return
default:
}
time.Sleep(time.Duration(rand.Intn(120)) * time.Millisecond)
pm.Progress()
}
}
2
3
4
5
6
7
8
9
10
11
12
观察协程期望被观察的协程至少每100毫秒记录一次进度。如果没有达到这个频率,观察协程将取消上下文以终止被观察的协程,同时它自己也会终止。在这种设置下,被观察的协程最终会出现两次进度宣告之间的时间超过100毫秒的情况,因此程序应该会终止:
func observer(ctx context.Context, cancel func(), progress *ProgressMeter) {
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()
var lastProgress int64
for {
select {
case <-ctx.Done():
return
case <-tick.C:
p := progress.Get()
if p == lastProgress {
fmt.Println("No progress since last time, canceling")
cancel()
return
}
fmt.Printf("Progress: %d\n", p)
lastProgress = p
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
我们通过创建长时间运行的协程及其观察协程,并使用上下文和进度指示器来将它们关联起来:
func main() {
var progress ProgressMeter
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
longGoroutine(ctx, &progress)
}()
go observer(ctx, cancel, &progress)
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
注意,我们将取消函数传递给观察协程,以便它可以向被观察的协程发送取消消息。接下来我们将看看另一种实现方式。
# 取消操作
我们已经了解了使用关闭通道来发出取消信号的方法。上下文(Context)的实现就采用了这种范式来发出取消和超时信号。使用原子操作也可以实现一个简单的取消机制:
func CancelSupport() (cancel func(), isCancelled func() bool) {
v := atomic.Bool{}
cancel = func() {
v.Store(true)
}
isCancelled = func() bool {
return v.Load()
}
return
}
2
3
4
5
6
7
8
9
10
CancelSupport
函数返回两个闭包,调用cancel()
函数可以发出取消信号,isCancelled()
函数可用于检查是否已注册了取消请求。这两个闭包共享一个原子布尔值。可以像这样使用:
func main() {
cancel, isCanceled := CancelSupport()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
time.Sleep(100 * time.Millisecond)
if isCanceled() {
fmt.Println("Cancelled")
return
}
}
}()
time.AfterFunc(5*time.Second, cancel)
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 检测变化
假设你有一个共享变量,多个协程都可以对其进行更新。你读取这个变量,进行一些计算,现在想要更新它。然而,在你获取变量副本后,另一个协程可能已经修改了这个变量。因此,你希望只有在没有其他协程更改该变量的情况下才进行更新。下面的代码片段使用比较并交换(Compare And Swap,CAS)操作来说明这一点:
var sharedValue atomic.Pointer[SomeStruct]
func updateSharedValue() {
myCopy := sharedValue.Load()
newCopy := computeNewCopy(*myCopy)
if sharedValue.CompareAndSwap(myCopy, &newCopy) {
fmt.Println("Set value successful")
} else {
fmt.Println("Another goroutine modified the value")
}
}
2
3
4
5
6
7
8
9
10
11
这段代码容易出现竞态条件,所以你必须小心。sharedValue.Load()
调用会原子地返回指向共享值的指针。如果另一个协程修改了*sharedValue
指向的对象内容,就会出现竞态条件。只有当所有协程都以原子方式获取指针并复制底层数据结构时,这个方法才有效。然后,我们使用CAS操作写入修改后的副本,如果另一个协程动作更快,这个操作可能会失败。
# 总结
总之,实现正确的并发算法并不一定需要原子操作。然而,如果你发现了并发瓶颈,原子操作会很有用。你可以用原子操作替换一些简单的由互斥锁保护的更新操作(比如计数器),前提是读取这些变量时也使用原子读操作。你可以使用CAS操作来检测并发修改,但也要注意,很少有并发算法需要这么做。
在下一章中,我们将探讨如何诊断并发程序中的问题并进行故障排除。