Go Book / 1 Go Basics / 16 Go并发编程(三): Go并发的传统同步机制

16 Go并发编程(三): Go并发的传统同步机制

Go 传统同步机制

在《Go并发编程初探》中我们提到同步概念,所谓同步是相对异步而言,即串行相对于并行。 在学习Go通信机制时我们知道管道其实就是并发单元同步方式的一种,基于CSP并发模型,Go在语言原语上使管道作为核心设计,这是Go的设计哲学,也是Go所提倡的同步机制。然而,Go在标准包sync中也提供了传统的“共享内存式通信”的同步机制,对某些设计场景也需要这种同步方式,下面我们就来解析sync包提供的传统同步机制。

同步机制解决什么问题?

在并发编程中常常会遇到以下几种情况:

  • 在主协程中开辟的子协程依赖于主协程的生命周期,即“主死从随”,为了让子协程全部执行完成,主协程需要等待,但等待的时间是不定的,直接设置主协程睡眠略显粗暴,能否让子协程告诉父协程任务已完成?
  • 在并发编程中常常遇到多个协程共同操作公共资源的情况,如外部文件的并发读写,如果对这种资源的操作仍旧使用并行,则势必会造成混乱,能否对资源加锁,让各协程竞争锁串行操作?
  • 你也许也遇到过一些同一时间内只允许一个写入或同一时间内允许同时读取相同资源的情况,这种情况是典型的一读多写,对这种资源的操作该如何控制?
  • 当要求多个协程并发去执行一项任务,并只允许其中一个协程生效时,该如何处理?
  • 当一个协程正监听一个其他协程也可访问的资源,并等待该资源被其他协程修改,在该资源被修改时该协程从阻塞状态唤醒并继续执行,这种需求该如何解决?
  • 当一个资源被并发访问,且业务要求改资源必须在物理级别上并发安全,及在物理级别实现同一时间只被一个协程读写,Go有没有这种安全同步机制?

以上情况都是在并发编程中常见的资源安全问题,Go提供sync包实现安全的同步机制,以下我们一一解决上述遇到的问题:

1.等待组 sync.WaitGroup

Go同步包sync提供等待组,以解决主协程等待子协程完成任务的问题。

示例:

//sync同步:等待组 sync.WaitGroup
func BaseSync01() {

	wg := sync.WaitGroup{}

	wg.Add(1)
	go func() {
		for i := 1; i <= 10; i++ {
			fmt.Println("协程1走起!!!")
			time.Sleep(time.Second)
		}
		fmt.Println("=====协程1搞定=====!!!")
		wg.Done()
	}()

	wg.Add(1)
	go func() {
		for i := 1; i <= 7; i++ {
			fmt.Println("协程2走起!!!")
			time.Sleep(time.Second)
		}
		fmt.Println("=====协程2搞定=====!!!")
		wg.Done()
	}()

	wg.Add(1)
	go func() {
		for i := 1; i <= 5; i++ {
			fmt.Println("协程3走起!!!")
			time.Sleep(time.Second)
		}
		fmt.Println("=====协程3搞定=====!!!")
		wg.Done()
	}()

	for i := 1; i <= 3; i++ {
		fmt.Println("主协程走起!!!")
		time.Sleep(time.Second)
	}

	wg.Wait()
	fmt.Println("=====全部搞定=====!!!")

}
2.同步锁/互斥锁

所谓互斥锁,synv.Mutex ,保证被锁定资源不被其他协程占用,即被加锁的对象在同一时间只允许一个协程读或写。

示例:

func BaseSync02() {
	//申请一个锁
	mutex := sync.Mutex{}

	myWalet := 200

	//开20个协程
	for i := 1; i <= 100; i++ {
		go func(n int) {
			//没个协程分100次给我的钱包发1元
			for j := 1; j <= 1000; j++ {
				mutex.Lock()
				myWalet += 1
				mutex.Unlock()
				//fmt.Printf("协程%d,第%d次给我发1元红包\n",n,j)
			}
		}(i)
	}

	time.Sleep(time.Second * 3)
	fmt.Println("我的钱包现在为:", myWalet)
}
3.读写锁

所谓读协程snyc.RWMutex,实现业务中对资源一写多读的情况 ,如对数据库读写,为保证数据原子性,同一时间只允许一个协程写资源,禁止其他写入或读取,或同一时间允许多个协程读取资源,但禁止任何协程写入。

示例:

func BaseSync03() {
	rwm := sync.RWMutex{}
	myWallet := 200

	//开启3个写协程
	for i := 1; i <= 3; i++ {
		go func(n int) {
			for j := 1; j <= 100; j++ {
				rwm.Lock()
				myWallet += 1
				fmt.Println("写协程", i, "抢到写锁,修改金额为:", myWallet)
				rwm.Unlock()
				time.Sleep(time.Microsecond * 200)
			}
		}(i)
	}

	//开启1000个读协程
	for i := 1; i <= 1000; i++ {
		go func(n int) {
			runtime.Gosched()
			rwm.RLock()
			fmt.Println("读协程", n, "读到钱包金额:", myWallet)
			rwm.RUnlock()
		}(i)
	}

	time.Sleep(time.Second * 10)
	fmt.Println("最后读取金额为:", myWallet)

}
4.只执行一次 sync.Once

多协程调用中对一个任务只允许执行一次

示例:

//案例:杀死比尔,可以开多个协程去杀,但人只能死一次
type People struct {
	Name  string
	Alive bool
}

func Kill(p *People) {
	p.Alive = false
	fmt.Println("Bill:我被杀了...")
}

func BaseSync04() {
	once := sync.Once{}
	bill := People{"Bill", true}
	wg := sync.WaitGroup{}

	//开启三个协程去杀比尔
	for i := 1; i <= 10; i++ {
		wg.Add(1)
		go func() {
			fmt.Println("去杀Bill!!!")
			once.Do(func() {
				Kill(&bill)
			})

			wg.Done()
		}()
	}

	wg.Wait()
	fmt.Println("杀死比尔任务完成!!!")

}
5.条件变量 sync.Cond

示例:

/*
场景:
	1.监听比特币涨跌
	2.比特币涨,走投资协程,
	3.比特币跌,马上停止投资
	4.各协程监听比特币价格
*/
func BaseSync05() {
	//申请一个条件变量
	cond := sync.NewCond(&sync.Mutex{})

	//被监听的变量
	bitCoinRaising := false

	//涨协程变量修改并广播
	go func() {
		for {
			time.Sleep(time.Second * 3)
			cond.L.Lock()
			bitCoinRaising = true
			cond.Broadcast()
			cond.L.Unlock()
		}
	}()

	//跌协程变量修改并广播
	go func() {
		ticker := time.NewTicker(time.Second * 5)
		for {
			<-ticker.C
			cond.L.Lock()
			bitCoinRaising = false
			cond.Broadcast()
			cond.L.Unlock()
		}

	}()

	//监听条件变量的主协程阻塞等待
	for {
		cond.L.Lock()
		//不断循环监听变量
		if !bitCoinRaising {
			fmt.Println("比特币没涨,先暂停下")
			cond.Wait()
			fmt.Println("比特币涨了,快点买买买,发财了发财了!!!")
		}
		cond.L.Unlock()
	}

}
6.原子操作 sync.atomic

物理级别实现资源读写的原子性,从根本上杜绝并发不安全的问题,但其主要缺陷是只支持对基本数据类型的操作,对其他类型则无能为力。

示例:

func BaseSync06() {
	var myWalet int64
	myWalet = 200

	//开20个协程
	for i := 1; i <= 100; i++ {
		go func(n int) {
			//没个协程分100次给我的钱包发1元
			for j := 1; j <= 1000; j++ {
				atomic.AddInt64(&myWalet, 1)
				//fmt.Printf("协程%d,第%d次给我发1元红包\n",n,j)
			}

		}(i)
	}

	time.Sleep(time.Second * 3)
	fmt.Println("我的钱包现在为:", myWalet)
}