Go Book / 3 Go Seniors / Go 并发编程: Goroutine常见应用范式

Go 并发编程: Goroutine常见应用范式

一、多独立协程并发——worker分工模式

并发协程独立运行且互不通信,主协程等待处理独立子协程的结果

并发编程有一种常见方式就是许多工作子协程都是独立的,互不干扰,但他们又是“同一时间”处理。 例如http服务器的工作模式就是这种,在go标准包net/http中,http服务启动后,它所接收的每一个请求都是独立协程处理的,每个请求的运行协程之间都互不通信。 我们知道,go http服务的请求处理协程都是动态创建的,但很多情况下,我们是需要一些固定数量的协程去独立处理任务,所以这里我们先说固定数量协程独立运行的情况。

范式实现:
  • 创建三个通道,分别处理“任务单元”、“任务结果”、“任务完成状态”三种通信数据;
  • 启动新增任务协程,把任务单元数据发送给jobs通道;
  • 各任务单元分别由独立的worker协程处理,执行独立任务,并把任务处理结果发送给results通道;
  • 启动等待任务结果协程,从done管道接收任务处理完成的数据;
  • 主协程显示处理结果。

我们知道go有个select原语,专门用来处理阻塞或非阻塞通道数据的,我们可以使用for…select…范式最大效率地不断从通道中读出数据。代码如下:

func Demo() {
	run("读书", "看报", "撸代码")
}

type TaskJob struct {
	task    string
	results chan<- Result
}

func (j *TaskJob) do() {
	fmt.Println("Do Task:", j.task)
	// Do Something...
	j.results <- Result{j.task, 200, "Successful"}
}

type Result struct {
	task    string
	code    int
	message string
}

// 逻辑核心数作为并发worker数
var workers = runtime.NumCPU()

func run(tasks ...string) {
	jobs := make(chan TaskJob, workers)      // 任务通道
	results := make(chan Result, len(tasks)) // 执行结果输出通道
	done := make(chan struct{}, workers)     // 任务完成状态通道

	// 子协程添加任务
	go addJob(jobs, tasks, results)

	// 使用单独的worker协程执行并发任务
	for i := 0; i < workers; i++ {
		go doJobs(done, jobs)
	}
    // 主协程等待并处理结果
	waitAndProcessResults1(done, results)
}
// 添加任务
func addJob(jobs chan<- TaskJob, tasks []string, results chan<- Result) {
	for _, task := range tasks {
		jobs <- TaskJob{task: task, results: results}
	}
	// 因为此为发送端,新增完任务后在此关闭通道
	close(jobs)
}

// 执行任务
func doJobs(done chan<- struct{}, jobs <-chan TaskJob) {
	for job := range jobs {
		job.do()
	}
	done <- struct{}{}
}

// 等待并处理子协程记过:for...select...范式同时处理results和done通道,合并awaitCompletion()和showResults()
func waitAndProcessResults1(done <-chan struct{}, results <-chan Result) () {
	// 【阻塞】运行worker时:要么就在处理结果<-results,要么处理完成<-done
	for i := workers; i > 0; {
		select {
		case result := <-results:
			fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
		case <-done:
			i--
		}
	}

	// 【非阻塞】worker全部做完后:results全部清空后退出
DONE:
	for {
		select {
		case result := <-results:
			fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
		default:
			break DONE
		}
	}
}

执行结果:

=== RUN   TestDemo31
Do Task: 撸代码
Do Task: 读书
Task Job :撸代码, Result:200,Successful
Task Job :读书, Result:200,Successful
Do Task: 看报
Task Job :看报, Result:200,Successful
--- PASS: TestDemo31 (0.00s)
PASS

可以看到,当有好几个不同的通道需要处理时,使用select原语是非常方便的。在这里我们集中处理results和done通道。

添加子协程超时处理:

当对worker任务子协程有超时要求时,也可在select中添加超时操作,对waitAndProcessResults()修改如下:

// 合并awaitCompletion()和showResults(),并增加超时处理
func waitAndProcessResults(timeout time.Duration, done <-chan struct{}, results <-chan Result) () {
	// 超时通道
	finish := time.After(time.Duration(timeout))

	// 【阻塞】运行worker时:要么就在处理结果<-results,要么处理完成<-done,要么超时
	for i := workers; i > 0; {
		select {
		case result := <-results:
			fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
		case <-finish:
			fmt.Println("worker任务执行超时!")
			return
		case <-done:
			i--
		}
	}

	// 【非阻塞】worker全部做完后:results全部清空后退出
	for {
		select {
		case result := <-results:
			fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
		case <-finish:
			fmt.Println("worker任务执行超时!")
			return
		default:
			return
		}
	}
}


二、地铁闸机——限流模式

开启多条子协程, 往有限的通道发送/接收数据,这种情形类似地铁站内的地铁闸机模式,闸机就那么几个,行人分组从闸机通过,通常哪个闸机人少,人们倾向于排哪个闸门的队,或者哪个闸机通行效率高就排哪个闸门的队。

在这种并发模式中,协程相当于行人,通道相当于地铁闸门,多条通道从有限闸门中通过。 具体代码如下:

func Demo() {
	var workers = 100000
	ch0 := make(chan int, 0)
	ch1 := make(chan int, 1)
	ch2 := make(chan int, 2)
	ch3 := make(chan int, 3)

	counter := make([]int, 4)

	// 开workers条协程,发送消息到四条通道
	for i := 0; i < workers; i++ {
		go func(n int) {
			select {
			case ch0 <- n:
				send(n, 0)
			case ch1 <- n:
				send(n, 1)
			case ch2 <- n:
				send(n, 2)
			case ch3 <- n:
				send(n, 3)
			}
		}(i)
	}

	// 主协程从四条通道接收workers条子协程发送的消息
	for i := 0; i < workers; i++ {
		select {
		case rec := <-ch0:
			receive(rec, 0)
			counter[0]++
		case rec := <-ch1:
			receive(rec, 1)
			counter[1]++
		case rec := <-ch2:
			receive(rec, 2)
			counter[2]++
		case rec := <-ch3:
			receive(rec, 3)
			counter[3]++
		}
	}
	fmt.Println("channel counter:", counter)
}

func send(i, j int) {
	fmt.Printf("goroutine#%d send %d to ch%d\n", i, i, j)
}

func receive(routine, ch int) {
	fmt.Printf("receive from goroutine#%d to ch%d channel\n", ch, routine)
}

运行结果:

...
receive from goroutine#92612 to ch2 channel
receive from goroutine#92426 to ch0 channel
receive from goroutine#92733 to ch0 channel
receive from goroutine#92531 to ch0 channel
goroutine#92531 send 92531 to ch0
goroutine#92616 send 92616 to ch3
goroutine#92425 send 92425 to ch2
goroutine#92612 send 92612 to ch2
goroutine#92733 send 92733 to ch0
goroutine#92424 send 92424 to ch2
receive from goroutine#92423 to ch3 channel
receive from goroutine#92614 to ch1 channel
receive from goroutine#92734 to ch1 channel
goroutine#92617 send 92617 to ch3
goroutine#92735 send 92735 to ch1
goroutine#92530 send 92530 to ch3
...
channel counter: [25030 25073 25011 24886]

以上结果表明,多条协程数据比较均衡地从四条通道中通过


三、扇入扇出——分流模式

我们通常会遇到许多耗时任务,如一个通道的数据流向一个执行函数时,当前函数执行时长较长,我们可以把该通道的数据流拆分流向多个通道,并给每个通道启动相应的goroutine处理;随后把所有通道汇总到一个通道输出,以加大该耗时任务的执行效率。 以上过程我们成为扇入扇出过程:

  • 扇出(Fan-out):将一个通道的数据分流给多个通道并启动多个goroutine处理;
  • 扇入(Fan-in):将多个goroutines返回的通道的结果组合到一个通道并输出;

我们看一个简单示例:模拟管道中某一阶段的耗时任务

// 模拟耗时任务
func takeUpTimeTask(done <-chan interface{}, inStream <-chan interface{}) <-chan interface{} {
	outStream := make(chan interface{})
	go func() {
		defer close(outStream)
		for {
			select {
			case <-done:
				return
			case val ,ok := <-inStream:
				if !ok {
					return
				}
				select {
				case <-done:
					return
				case outStream <- val:
					// 模拟耗时任务
					time.Sleep(time.Second)
				}

			}
		}
	}()
	return outStream
}

我们来模拟调用运行它

// 没有扇入扇出处理
func Demo1() {
	done := make(chan interface{})
	defer close(done)
	inStream := make(chan interface{})

	go func() {
		defer close(inStream)
		for i := 0; i < 20; i++ {
			inStream <- i
		}
	}()

	// 对数据流执行耗时任务
	resultChan := takeUpTimeTask(done, inStream)

	// 使用chRange安全遍历打印
	for val := range chRange(done,resultChan){
		fmt.Printf("%v ", val)
	}
}

结果输出:处理20个数据花费20s

=== RUN   TestDemo1
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 --- PASS: TestDemo81 (20.05s)
PASS

这种情况很显然是可以用并发对耗时任务进行改进的,使用扇入扇出的思路可以这样做:


// 扇出处理耗时任务
func fanOut(done <-chan interface{}, chanStream chan interface{}) []<-chan interface{} {
	numFinders := runtime.NumCPU()
	finders := make([]<-chan interface{}, numFinders)
	for i := 0; i < numFinders; i++ {
		// 耗时任务的分流管道
		finders[i] = takeUpTimeTask(done, chanStream)
	}
	return finders
}

// 扇入汇流结果通道
func fanIn(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
	var wg sync.WaitGroup
	multiplexedStream := make(chan interface{})
	// 管道汇流处理
	multiplex := func(c <-chan interface{}) {
		defer wg.Done()
		for i := range c {
			select {
			case <-done:
				return
			case multiplexedStream <- i:
			}
		}
	}
	// 从所有的通道中取数据
	wg.Add(len(channels))
	for _, c := range channels {
		go multiplex(c)
	}
	// 等待所有数据汇总完毕
	go func() {
		wg.Wait()
		close(multiplexedStream)
	}()
	return multiplexedStream
}

改进之后我们再来调用它们:

// 扇入扇出处理
func Demo2() {
	done := make(chan interface{})
	defer close(done)
	inStream := make(chan interface{})

	go func() {
		defer close(inStream)
		for i := 0; i < 20; i++ {
			inStream <- i
		}
	}()

	// 扇入扇出执行耗时任务
	resultChan := fanIn(done, fanOut(done, inStream)...)

	// 使用chRange安全遍历打印
	for val := range chRange(done,resultChan){
		fmt.Printf("%v ", val)
	}
}

结果输出:四个逻辑单元并发运行耗时5s,效率提升4倍

=== RUN   TestDemo82
0 1 3 2 4 6 5 7 8 10 9 11 12 13 14 15 16 18 17 19 --- PASS: TestDemo82 (5.02s)
PASS

四、动态创建协程

根据需要来动态创建goroutine,并限制可并发的协程数。

// 设置默认最多开启5子协程
const maxRoutineNum = 5

func autoRoutine(wg *sync.WaitGroup, inStream <-chan int) {
	for {
		in, ok := <-inStream
		if !ok {
			break
		}
		// 自动开启协程的条件:输入流为偶数
		if in%2 == 0 && runtime.NumGoroutine() < maxRoutineNum{
			wg.Add(1)
			go process(in, func() { wg.Done() })
		} else {
			process(in, nil)
		}
	}
}

func process(in int, callback func()) {
	if callback != nil {
		defer callback()
		fmt.Printf("sub routine process:%d \n", in)
		time.Sleep(1000000 * time.Microsecond)
	}else{
		fmt.Printf("parent routine process:%d \n", in)
		time.Sleep(100000 * time.Microsecond)
	}
}

调用示例:

// 动态创建协程
func Demo() {
	wg := sync.WaitGroup{}
	inStream := make(chan int)
	wg.Add(1)
	go func() {
		defer wg.Done()
		defer close(inStream)
		for i := 1; i < 10; i++ {
			inStream <- i
		}
	}()
	autoRoutine(&wg, inStream)
	wg.Wait()
}

运行结果:可以看到,程序根据要求最多开不超过5个子协程

=== RUN   TestDemo121
parent routine process:1 
sub routine process:2 
parent routine process:3 
parent routine process:5 
sub routine process:4 
parent routine process:6 
parent routine process:7 
parent routine process:8 
parent routine process:9 
--- PASS: TestDemo121 (1.21s)
PASS

以上只是演示该模式的一个简单例子,但它可以扩展到许多应用,如对不定数量的资源进行计算,其中某些资源比较耗时,需要开启协程提高执行效率,但协程数不能无限扩张。一个具体的例子就是使用go标准包的filepath.Walk()对系统特点目录的文件进行哈希计算,显然对某个目录的文件数是事先不确定的,且大文件的哈希计算耗时比较大,这就需要开启子协程计算,而协程又不能无限制扩张,因为系统对打开文件数又有限制。这种场景就非常适合使用这种模式,感兴趣的可自己实现一下。