Go Book / 3 Go Seniors / Go 并发编程:通道常见应用范式

Go 并发编程:通道常见应用范式

通道经典应用

一、闭包实现通道访问限制

在Go的并发编程中,创建通道和开辟协程是非常方便且容易的,正因如此,有可能会导致开发者滥用。如果在团队开发中没有良好的协商和规范,更可能会导致并发数据不安全。 例如:

func Demo() {
	ch := make(chan int, 0)
	go dosomething(ch, 10)
	go dosomething(ch, 20)
	dosomething(ch, 30)
}

func dosomething(ch chan int, num int) {
	for i := 1; i < num; i++ {
		ch <- i
	}
	close(ch1)
}

以上是个非常明显的不规范使用通道的例子,你能得到的只有死锁!

fatal error: all goroutines are asleep - deadlock!

那么能不能限制通道的使用呢,即特定通道只让特定协程使用? 有几种解决方案:

  • 团队协商,在代码规范上下功夫,显然这是不安全的。
  • 使用同步功能
  • 闭包实现访问受限的通道

以下我们使用第三种方案: 闭包实现访问受限的通道,只允许特定协程使用

func Demo() {
	// 生产者:producter 内部开辟一条协程往里面发送数据,并返回一个只读通道
	producter := func() <-chan int {
		results := make(chan int, 5) // 该通道只作用于特定闭包内的作用域
		go func() {
			defer close(results)
			for i := 0; i <= 5; i++ {
				results <- i
			}
		}()
		return results
	}

	// 消费者:运行这个闭包时需要传入一个只读通道
	consumer := func(results <-chan int) { 
		for result := range results {
			fmt.Printf("Received: %d\n", result)
		}
		fmt.Println("Done receiving!")
	}
        
 	consumer(producter())
}

二、for-select范式:关于多通道操作的整合

我们知道select语句是go专门为多通道操作提供的原语,单个select语句可以一次性的从多个通道选取一个来读写,只要哪个通道先不处于阻塞状态便选取哪个通道读写。而结合for循环语句构成的for-select结构可以循环不断的从多通道读写数据,直到特定条件退出。

for-select循环模式如下所示:

for { // 无限循环或遍历
    select {
    // 对通道进行操作
    }
}

常见的几种for-select循环的用法: a. 在通道上发送迭代变量

for _, s := range []string{"a", "b", "c"} {
    select {
    case <-done:
        return
    case stringStream <- s:   // slice数据循环迭代写入channel
    }
}

b. 无限循环等待停止

// 第一种方式
for {
    select {
    case <-done: 
        return   // 停止返回
    default:
    }
    // 执行非抢占任务
}

// 第二种方式
for {
    select {
    case <-done:
        return 
    default:    
      // 将要执行的任务放入default分支中
      // 执行非抢占任务
    }
}

通过善用通道,我们可以在许多并发过程中尽量避免使用同步锁,select原语可以集中处理多个通道,大大提高了开发和运行效率。

三、or-channel :递归多个通道的或读取,只要有一个通道返回即完成


/*
递归多个通道的或读取,只要有一个通道返回即完成
*/
func Demo() {
	var or func(channels ...<-chan interface{}) <-chan interface{}
    // 建立了名为or的递归函数,接收数量可变的通道并返回单个通道。
	or = func(channels ...<-chan interface{}) <-chan interface{} 
    // 两个递归终止条件		
    switch len(channels) {
		case 0:  // 如果传入的切片是空的,我们简单的返回一个nil通道
			return nil
		case 1:  // 如果切片只含有一个元素,我们就返回给元素
			return channels[0]
		}
      
		orChannel := make(chan interface{})
        // 建立一个goroutine,以便可以不受阻塞地等待我们通道上的消息
		go func() { 
			defer close(orDone)

			switch len(channels) {
			case 2: // 由于我们这里是递归的,每次递归调用将至少有两个通道。作为保持goroutine数量受到限制的优化方法,们在这里为仅使用两个通道的时设置了一个特殊情况。
				select {
				case <-channels[0]:
				case <-channels[1]:
				}
			default: // 递归地在第三个索引之后,从切片中的所有通道中创建一个or通道,然后从中选择。递归操作会逐层累计直到取到第一个通道元素。我们在其中传递了orChannel通道,这样当该树状结构顶层的goroutines退出时,结构底层的goroutines也会退出。
				select {
				case <-channels[0]:
				case <-channels[1]:
				case <-channels[2]:
				case <-or(append(channels[3:], orChannel)...): 
				}
			}
		}()
		return orDone
	}



	// 下面这个例子将经过一段时间后关闭通道,然后使用or函数将这些通道合并到一个关闭的通道中:
	sig := func(after time.Duration) <-chan interface{} { // 创建了一个通道,当后续时间中指定的时间结束时将关闭该通道
		c := make(chan interface{})
		go func() {
			defer close(c)
			time.Sleep(after)
		}()
		return c
	}

	start := time.Now() // 设置追踪自or函数的通道开始阻塞的起始时间
	<-or(sig(2*time.Hour), sig(5*time.Minute), sig(1*time.Second), sig(1*time.Hour), sig(1*time.Minute))
	fmt.Printf("done after %v", time.Since(start)) // 打印阻塞发生的时间
}

这是一种奇妙的做法,你可以将任意数量的通道组合到单个通道中,只要任何作为组件的通道关闭或被写入,整个通道就会关闭。

四、chRange 封装安全的通道遍历读取

有时你会与来自系统不同部分的通道交互。与管道不同的是,当你使用的代码通过done通道取消操作时,你无法对通道的行为方式做出判断。也就是说,你不知道正在执行读取操作的goroutine现在是什么状态。出于这个原因,正如我们在“防止Goroutine泄漏”中所阐述的那样,需要用select语句来封装我们的读取操作和done通道。可以简单的写成这样:

for val := range myChan {
	// 对 val 进行处理
}

展开后可以写成这样:

loop:
	for {
		select {
		case <-done:
			break loop
		case maybeVal, ok := <-myChan:
			if ok == false {
				return // or maybe break from for
			}
			// Do something with val
		}
	}

这样做可以快速退出嵌套循环。继续使用goroutines编写更清晰的并发代码,而不是过早优化的主题,我们可以用一个goroutine来解决这个问题。 我们封装了细节,以便其他人调用更方便:

/*
封装一个通用的安全的通道读取器,以便于可安全地for range遍历任意通道
*/
var chRange = func(done, ch <-chan interface{}) <-chan interface{} {
	valStream := make(chan interface{})
    // 使用协程闭包封装安全的读取通道 
	go func() {
		defer close(valStream)
		for {
			select {
			case <-done:
				return
			case v, ok := <-ch:
				if ok == false {
					return
				}
				select {
				case valStream <- v:
				case <-done:
				}
			}
		}
	}()

	return valStream
}

调用示例:这样对任意通道我们都可以简单安全的读取

func Demo() {
	done := make(chan interface{})
	defer close(done)

	ch := make(chan interface{})
	go func() {
		defer close(ch)
		for i := 0; i < 10; i++ {
			ch <- i
		}
	}()

	for val := range chRange(done, ch) {
		fmt.Printf("read %v \n", val)
	}
}

五、tee-channel 分割通道数据流

tee-channel类似Linux的tee命令,分割来自通道的值,以便将它们发送到两个独立区域。想象一下:你可能想要在一个通道上接收一系列操作指令,将它们发送给执行者,同时记录操作日志。

var tee = func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {

	out1 := make(chan interface{})
	out2 := make(chan interface{})

	go func() {
		defer close(out1)
		defer close(out2)
		for val := range chRange(done, in) {
			select {
			case <-done:
			default:
				out1 <- val
				out2 <- val
			}

		}
	}()
	return out1, out2
}

注意写入out1和out2是紧密耦合的。 直到out1和out2都被写入,迭代才能继续。 通常这不是问题,因为无论如何,处理来自每个通道的读取流程的吞吐量应该是tee之外的关注点,但值得注意。 这是一个快速调用示例:

func Demo()  {
	done := make(chan interface{})
	defer close(done)

	ch := make(chan interface{})
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}
		defer close(ch)
	}()

	out1, out2 := tee(done, ch)

	for val1 := range out1 {
		fmt.Printf("out1: %v, out2: %v\n", val1, <-out2)
	}

利用这种模式,很容易使用通道作为系统数据的连接点。

六、bridge-channel

在某些情况下,你可能会发现自己想要使用一系列通道,即你可能需要从一个通道中获取多个通道的值:

<-chan <-chan interface{}

这与将某个通道的数据切片合并到一个通道中稍有不同,这种调用方式意味着一系列通道有序的写入操作。从通道读取一系列通道的值 ,类似多通道过独木桥。

// 通道桥接
var bridge = func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {

	valStream := make(chan interface{}) // 1
	go func() {
		defer close(valStream)
		for { // 2
			var stream <-chan interface{}
			select {
			case maybeStream, ok := <-chanStream:
				if ok == false {
					return
				}
				stream = maybeStream
			case <-done:
				return
			}
			for val := range chDone(done, stream) { // 3
				select {
				case valStream <- val:
				case <-done:
				}
			}
		}
	}()
	return valStream
}

使用示例:

func Demo() {
	genVals := func() <-chan <-chan interface{} {

		chanStream := make(chan (<-chan interface{}))

		go func() {
			defer close(chanStream)
			for i := 0; i < 10; i++ {
				stream := make(chan interface{}, 1)
				stream <- i
				close(stream)
				chanStream <- stream
			}
		}()
		return chanStream
	}

	for v := range bridge(nil, genVals()) {
		fmt.Printf("%v ", v)
	}
}