sync 包提供了基础的异步操作方法,包括互斥锁 Mutex,执行一次 Once 和并发等待组 WaitGroup 等.
常用方法
sync
包有以下常用的几个方法
-
Mutex: 互斥锁
-
RWMutex:读写锁
-
WaitGroup:并发等待组
-
Once:执行一次
-
Cond:信号量
-
Pool:临时对象池
-
Map:自带锁的map
Mutex 和 RWMutex
Mutex
是最简单的一种锁类型,同时也比较暴力,当一个 goroutine
获得了 Mutex
后,其他 goroutine
就只能乖乖等到这个 goroutine
释放该 Mutex
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var (
lock sync.Mutex
m = make(map[int]int)
)
for i := 0; i < 10; i++ {
j := i
go func() {
lock.Lock()
m[j] = j
lock.Unlock()
}()
}
time.Sleep(time.Second)
for k, v := range m {
fmt.Println(k, v)
}
}
|
RWMutex
相对友好些,是经典的单写多读模型。在读锁占用的情况下,会阻止写,但不阻止读,也就是多个 goroutine
可同时获取读锁(调用 RLock()
方法;而写锁(调用 Lock()
方法)会阻止任何其他 goroutine
(无论读和写)进来,整个锁相当于由该 goroutine
独占。从 RWMutex
的实现看,RWMutex
类型其实组合了 Mutex
:
1
2
3
4
5
6
7
8
|
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
|
对于这两种锁类型,任何一个 Lock()
或 RLock()
均需要保证对应有 Unlock()
或 RUnlock()
调用与之对应,否则可能导致等待该锁的所有 goroutine
处于饥饿状态,甚至可能导致死锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package main
import (
"sync"
"time"
)
var m *sync.RWMutex
var val = 0
func main() {
m = new(sync.RWMutex)
go read(1)
go write(2)
go read(3)
time.Sleep(5 * time.Second)
}
func read(i int) {
m.RLock()
time.Sleep(1 * time.Second)
println("val: ", val)
time.Sleep(1 * time.Second)
m.RUnlock()
}
func write(i int) {
m.Lock()
val = 10
time.Sleep(1 * time.Second)
m.Unlock()
}
|
但是如果我们把 read
中的 RLock
和 RUnlock
两个函数给注释了,就返回了 :
这个就是由于读的时候没有加读锁,在准备读取 val 的时候,val 被 write 函数进行修改了。
Once
sync.Once
指的是只执行一次的对象实现,常用来控制某些函数只能被调用一次。sync.Once
经常被用在单例模式、系统初始化这种场景。
sync.Once
的结构如下所示,只有一个函数。使用变量 done
来记录函数的执行状态,使用 sync.Mutex
和 sync.atomic
来保证线程安全的读取 done
。
1
2
3
4
5
6
7
|
type Once struct {
m Mutex #互斥锁
done uint32 #执行状态
}
func (o *Once) Do(f func())
|
使用 sync
包的 once.Do
来做个demo
,实现单例模式,我们前文设计模式里面提到,点击这里查阅:单例模式。
WaitGroup 和 Cond
一个 goroutine
需要等待一批 goroutine
执行完毕以后才继续执行,那么这种多线程等待的问题就可以使用 WaitGroup
了。
具体示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package main
import (
"fmt"
"sync"
)
func main() {
wg := new(sync.WaitGroup)
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
fmt.Println("done ", i)
defer wg.Done()
}(i)
}
wg.Wait()
fmt.Println("wait end")
}
|
sync.Cond
是用来控制某个条件下,goroutine
进入等待时期,等待信号到来,然后重新启动 goroutine
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package main
import (
"fmt"
"sync"
"time"
)
func main() {
lock := new(sync.Mutex)
cond := sync.NewCond(lock)
done := false
cond.L.Lock()
go func() {
fmt.Println("now done is ", done);
time.Sleep(2 * time.Second)
done = true
cond.Signal()
}()
if (!done) {
cond.Wait()
}
fmt.Println("now done is ", done);
}
|
当主进程进入 cond.Wait
的时候,就会进入阻塞状态进行等待,当 goroutine
发出 cond.Signal
信号之后,主进程才会继续往下面走。
技巧
sync.Cond
还有一个 BroadCast
方法,用来通知唤醒所有等待的 gouroutine
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
package main
import (
"fmt"
"sync"
"time"
)
var lock = new(sync.Mutex)
var cond = sync.NewCond(lock)
func test(x int) {
// 获取锁
cond.L.Lock()
// 等待通知 暂时阻塞
cond.Wait()
fmt.Println(x)
time.Sleep(time.Second * 1)
// 释放锁,不释放的话将只会有一次输出
cond.L.Unlock()
}
func main() {
for i := 0; i < 40; i++ {
go test(i)
}
fmt.Println("start all")
// 下发广播给所有等待的 goroutine
cond.Broadcast()
time.Sleep(time.Second * 60)
}
|
处于 cond.Wait
状态的所有 gouroutine
收到信号后将全部被唤醒并往下执行。需要注意的是, gouroutine
执行完任务后,需要通过 cond.L.Unlock
释放锁, 否则其它被唤醒的 gouroutine
将没法继续执行。
cond.Wait 的源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (c *Cond) Wait() {
c.checker.check()
if raceenabled {
raceDisable()
}
atomic.AddUint32(&c.waiters, 1)
if raceenabled {
raceEnable()
}
c.L.Unlock()
runtime_Syncsemacquire(&c.sema)
c.L.Lock()
}
|
原因是 cond.Wait
会自动释放锁等待信号的到来,当信号到来后,第一个获取到信号的 Wait
将继续往下执行并从新上锁,如果不释放锁, 其它收到信号的 gouroutine
将阻塞无法继续执行。
sync/atomic - 原子操作
原子操作是比其它同步技术更基础的操作。原子操作是无锁的,常常直接通过 CPU 指令直接实现。 事实上,其它同步技术的实现常常依赖于原子操作。
在单例模式这篇文章里面,我们就有过相关示例,这里便不再做更多赘述。
CAS
原子操作中最经典的 CAS(compare-and-swap)
在 atomic
包中是 Compare
开头的函数。
1
2
3
4
5
6
7
8
9
10
11
12
|
- func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
- func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
- func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
- func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
- func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
- func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
|
CAS
的意思是判断内存中的某个值是否等于 old
值,如果是的话,则赋 new
值给这块内存。CAS
是一个方法,并不局限在 CPU
原子操作中。
CAS
比互斥锁乐观,但是也就代表 CAS
是有赋值不成功的时候,调用 CAS
的那一方就需要处理赋值不成功的后续行为了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package main
import (
"fmt"
"runtime"
"time"
"sync/atomic"
)
func c32To42(total *int32, old int32, id int) {
if atomic.CompareAndSwapInt32(total, old, 42) {
fmt.Printf("32 to 42:id%d, work\n",id)
} else {
fmt.Printf("32 to 42:id%d, fail\n",id)
}
}
func main() {
// 使用多个处理器
runtime.GOMAXPROCS(2)
count := int32(32)
old := count
for i:=0; i<2; i++ {
go c32To42(&count, old, i)
}
time.Sleep(time.Second*1)
}
|
上面例子中,当 count
第一次被比较的时候,发现 old 和 42 是不同的,因此做了交换,第二次执行的时候,此时 count
已经是 42 了,因此不做交换。
这一系列的函数需要比较后再进行交换,也有不需要进行比较就进行交换的原子操作。
1
2
3
4
5
6
7
8
9
10
11
12
|
- func SwapInt32(addr *int32, new int32) (old int32)
- func SwapInt64(addr *int64, new int64) (old int64)
- func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
- func SwapUint32(addr *uint32, new uint32) (old uint32)
- func SwapUint64(addr *uint64, new uint64) (old uint64)
- func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
|
增加或减少
对一个数值进行增加或者减少的行为也需要保证是原子的,它对应于 atomic
包的函数就是。
1
2
3
4
5
6
7
8
9
10
|
- func AddInt32(addr *int32, delta int32) (new int32)
- func AddInt64(addr *int64, delta int64) (new int64)
- func AddUint32(addr *uint32, delta uint32) (new uint32)
- func AddUint64(addr *uint64, delta uint64) (new uint64)
- func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
|
读取或写入
当我们要读取一个变量的时候,很有可能这个变量正在被写入,这个时候,我们就很有可能读取到写到一半的数据。
所以读取操作是需要一个原子行为的。在 atomic
包中就是 Load
开头的函数群。
1
2
3
4
5
6
7
8
9
10
11
12
|
- func LoadInt32(addr *int32) (val int32)
- func LoadInt64(addr *int64) (val int64)
- func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
- func LoadUint32(addr *uint32) (val uint32)
- func LoadUint64(addr *uint64) (val uint64)
- func LoadUintptr(addr *uintptr) (val uintptr)
|
读取我们是完成了原子性,写入也是同样的,如果有多个 CPU 往内存中一个数据块写入数据的时候,可能导致这个写入的数据不完整。
在 atomic
包对应的是 Store
开头的函数群。
1
2
3
4
5
6
7
8
9
10
11
12
|
- func StoreInt32(addr *int32, val int32)
- func StoreInt64(addr *int64, val int64)
- func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
- func StoreUint32(addr *uint32, val uint32)
- func StoreUint64(addr *uint64, val uint64)
- func StoreUintptr(addr *uintptr, val uintptr)
|