目录

GO标准库-sync

sync 包提供了基础的异步操作方法,包括互斥锁 Mutex,执行一次 Once 和并发等待组 WaitGroup 等.

常用方法

sync 包有以下常用的几个方法

  • Mutex: 互斥锁

  • RWMutex:读写锁

  • WaitGroup:并发等待组

  • Once:执行一次

  • Cond:信号量

  • Pool:临时对象池

  • Map:自带锁的map

Mutex 和 RWMutex

Mutex 是最简单的一种锁类型,同时也比较暴力,当一个 goroutine 获得了 Mutex 后,其他 goroutine 就只能乖乖等到这个 goroutine 释放该 Mutex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var (
        lock sync.Mutex
        m    = make(map[int]int)
    )
    for i := 0; i < 10; i++ {
        j := i
        go func() {
            lock.Lock()
            m[j] = j
            lock.Unlock()
        }()
    }
    time.Sleep(time.Second)
    for k, v := range m {
        fmt.Println(k, v)
    }
}

RWMutex 相对友好些,是经典的单写多读模型。在读锁占用的情况下,会阻止写,但不阻止读,也就是多个 goroutine 可同时获取读锁(调用 RLock() 方法;而写锁(调用 Lock() 方法)会阻止任何其他 goroutine(无论读和写)进来,整个锁相当于由该 goroutine 独占。从 RWMutex 的实现看,RWMutex 类型其实组合了 Mutex

1
2
3
4
5
6
7
8

type RWMutex struct {
    w Mutex
    writerSem uint32
    readerSem uint32
    readerCount int32
    readerWait int32
}

对于这两种锁类型,任何一个 Lock()RLock() 均需要保证对应有 Unlock()RUnlock() 调用与之对应,否则可能导致等待该锁的所有 goroutine 处于饥饿状态,甚至可能导致死锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

package main

import (
    "sync"
    "time"
)

var m *sync.RWMutex
var val = 0

func main() {
    m = new(sync.RWMutex)
    go read(1)
    go write(2)
    go read(3)
    time.Sleep(5 * time.Second)
}

func read(i int) {
    m.RLock()
    time.Sleep(1 * time.Second)
    println("val: ", val)
    time.Sleep(1 * time.Second)
    m.RUnlock()
}

func write(i int) {
	m.Lock()
    val = 10
	time.Sleep(1 * time.Second)
	m.Unlock()
}

但是如果我们把 read 中的 RLockRUnlock 两个函数给注释了,就返回了 :

1
2
3

val:  10
val:  10

这个就是由于读的时候没有加读锁,在准备读取 val 的时候,val 被 write 函数进行修改了。

Once

sync.Once 指的是只执行一次的对象实现,常用来控制某些函数只能被调用一次。sync.Once 经常被用在单例模式、系统初始化这种场景。

sync.Once 的结构如下所示,只有一个函数。使用变量 done 来记录函数的执行状态,使用 sync.Mutexsync.atomic 来保证线程安全的读取 done

1
2
3
4
5
6
7

type Once struct {
	m    Mutex     #互斥锁
	done uint32    #执行状态
}

func (o *Once) Do(f func())

使用 sync 包的 once.Do 来做个demo,实现单例模式,我们前文设计模式里面提到,点击这里查阅:单例模式

WaitGroup 和 Cond

一个 goroutine 需要等待一批 goroutine 执行完毕以后才继续执行,那么这种多线程等待的问题就可以使用 WaitGroup 了。

具体示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22

package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := new(sync.WaitGroup)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            fmt.Println("done ", i)
            defer wg.Done()
        }(i)
    }

    wg.Wait()
    fmt.Println("wait end")
}

sync.Cond 是用来控制某个条件下,goroutine 进入等待时期,等待信号到来,然后重新启动 goroutine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    lock := new(sync.Mutex)
    cond := sync.NewCond(lock)
    done := false

    cond.L.Lock()

    go func() {
        fmt.Println("now done is ", done);
        time.Sleep(2 * time.Second)
        done = true
        cond.Signal()
    }()

    if (!done) {
        cond.Wait()
    }

    fmt.Println("now done is ", done);
}

当主进程进入 cond.Wait 的时候,就会进入阻塞状态进行等待,当 goroutine 发出 cond.Signal 信号之后,主进程才会继续往下面走。

技巧
sync.Cond 还有一个 BroadCast 方法,用来通知唤醒所有等待的 gouroutine
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

package main

import (
    "fmt"
    "sync"
    "time"
)

var lock = new(sync.Mutex)
var cond = sync.NewCond(lock)

func test(x int) {
    // 获取锁
    cond.L.Lock()
    // 等待通知  暂时阻塞
    cond.Wait()   
    fmt.Println(x)
    time.Sleep(time.Second * 1)
    // 释放锁,不释放的话将只会有一次输出
    cond.L.Unlock() 
}
func main() {
    for i := 0; i < 40; i++ {
        go test(i)
    }
    fmt.Println("start all")
    //  下发广播给所有等待的 goroutine
    cond.Broadcast() 
    time.Sleep(time.Second * 60)
}

处于 cond.Wait 状态的所有 gouroutine 收到信号后将全部被唤醒并往下执行。需要注意的是, gouroutine 执行完任务后,需要通过 cond.L.Unlock 释放锁, 否则其它被唤醒的 gouroutine 将没法继续执行。

cond.Wait 的源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14

func (c *Cond) Wait() {
    c.checker.check()
    if raceenabled {
        raceDisable()
    }
    atomic.AddUint32(&c.waiters, 1)
    if raceenabled {
        raceEnable()
    }
    c.L.Unlock()
    runtime_Syncsemacquire(&c.sema)
    c.L.Lock()
}

原因是 cond.Wait 会自动释放锁等待信号的到来,当信号到来后,第一个获取到信号的 Wait 将继续往下执行并从新上锁,如果不释放锁, 其它收到信号的 gouroutine 将阻塞无法继续执行。

sync/atomic - 原子操作

原子操作是比其它同步技术更基础的操作。原子操作是无锁的,常常直接通过 CPU 指令直接实现。 事实上,其它同步技术的实现常常依赖于原子操作。

单例模式这篇文章里面,我们就有过相关示例,这里便不再做更多赘述。

CAS

原子操作中最经典的 CAS(compare-and-swap)atomic 包中是 Compare 开头的函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

- func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

- func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

- func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

- func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)

- func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)

- func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

CAS 的意思是判断内存中的某个值是否等于 old 值,如果是的话,则赋 new 值给这块内存。CAS 是一个方法,并不局限在 CPU 原子操作中。

CAS 比互斥锁乐观,但是也就代表 CAS 是有赋值不成功的时候,调用 CAS 的那一方就需要处理赋值不成功的后续行为了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

package main


import (
	"fmt"
	"runtime"
	"time"
	"sync/atomic"
)

func c32To42(total *int32, old int32, id int) {
	if atomic.CompareAndSwapInt32(total, old, 42) {
    	fmt.Printf("32 to 42:id%d, work\n",id)
	} else {
    	fmt.Printf("32 to 42:id%d, fail\n",id)
	}
}

func main() {

    // 使用多个处理器
	runtime.GOMAXPROCS(2)		

	count := int32(32)
    old := count

	for i:=0; i<2; i++ {
   		go c32To42(&count, old, i)
	}

	time.Sleep(time.Second*1)
}

上面例子中,当 count 第一次被比较的时候,发现 old 和 42 是不同的,因此做了交换,第二次执行的时候,此时 count 已经是 42 了,因此不做交换。

这一系列的函数需要比较后再进行交换,也有不需要进行比较就进行交换的原子操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

- func SwapInt32(addr *int32, new int32) (old int32)

- func SwapInt64(addr *int64, new int64) (old int64)

- func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

- func SwapUint32(addr *uint32, new uint32) (old uint32)

- func SwapUint64(addr *uint64, new uint64) (old uint64)

- func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

增加或减少

对一个数值进行增加或者减少的行为也需要保证是原子的,它对应于 atomic 包的函数就是。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10

- func AddInt32(addr *int32, delta int32) (new int32)

- func AddInt64(addr *int64, delta int64) (new int64)

- func AddUint32(addr *uint32, delta uint32) (new uint32)

- func AddUint64(addr *uint64, delta uint64) (new uint64)

- func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

读取或写入

当我们要读取一个变量的时候,很有可能这个变量正在被写入,这个时候,我们就很有可能读取到写到一半的数据。

所以读取操作是需要一个原子行为的。在 atomic 包中就是 Load 开头的函数群。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

- func LoadInt32(addr *int32) (val int32)

- func LoadInt64(addr *int64) (val int64)

- func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

- func LoadUint32(addr *uint32) (val uint32)

- func LoadUint64(addr *uint64) (val uint64)

- func LoadUintptr(addr *uintptr) (val uintptr)

读取我们是完成了原子性,写入也是同样的,如果有多个 CPU 往内存中一个数据块写入数据的时候,可能导致这个写入的数据不完整。

atomic 包对应的是 Store 开头的函数群。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

- func StoreInt32(addr *int32, val int32)

- func StoreInt64(addr *int64, val int64)

- func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

- func StoreUint32(addr *uint32, val uint32)

- func StoreUint64(addr *uint64, val uint64)

- func StoreUintptr(addr *uintptr, val uintptr)