Go Book / 3 Go Seniors / Go 并发编程:通道应用范式之管道模式

Go 并发编程:通道应用范式之管道模式

管道模式

一、类Unix串行管道:使用通道实现串行管道功能

我们在使用类Unix系统时常常用到管道命令,如"ls |grep ‘path/to’ “,它可以让数据在多个命令操作中串行处理。Go的通道也可以做到如此,利用通道通信的特性我们可以创建多个连续通道,让一个函数的输出作为另一个函数的输入,而另一个函数的输出也可以作为其他函数的输入。

Go标准库中的io.Pipe()可以创建类Unix风格管道,它适合纯粹的IO系统原语的管道操作。然而go语言原语中的通道也可以做到类似的操作,以下是Go并发编程的范式之一,可以普适到更多的应用场景。

// 管道过滤器范式
func Demo() {
	a(b(c(source("source1", "source2", "source3"))))
}

func a(in <-chan string) {
	for i := range in {
		fmt.Println("a" + i)
	}
}

func b(in <-chan string) <-chan string {
	out := make(chan string, cap(in))
	go func() {
		defer close(out)
		for i := range in {
			out <- "b" + i
		}
	}()
	return out
}

func c(in <-chan string) <-chan string {
	out := make(chan string, cap(in))
	go func() {
		defer close(out)
		for i := range in {
			out <- "c" + i
		}
	}()
	return out
}

// 管道输入源
func source(inputs ...string) <-chan string {
	out := make(chan string, len(inputs))

	go func() {
		defer close(out)
		for _, item := range inputs {
			out <- item
			fmt.Println("source input:", item)
		}
	}()
	return out
}

运行输出:

=== RUN   TestDemo21
source input: source1
source input: source2
source input: source3
abcsource1
abcsource2
abcsource3
--- PASS: TestDemo21 (0.00s)
PASS

二、构建管道最佳实践

以上为最简单的模拟管道,但仔细一看还是存在问题的,我们在《防止Goroutine泄露》中讨论过,管道每个阶段都有可能出现协程泄露的风险,我们可以引入done管道解决这个问题,下面看一个比较有实质性的例子:通过一个生成器产生一些值并进入管道运算,最后输出值。

// 生成器
generator := func(done <-chan interface{}, integers ...int) <-chan int {
	intStream := make(chan int)
	go func() {
		defer close(intStream)
		for _, i := range integers {
			select {
			case <-done:
				return
			case intStream <- i:
			}
		}
	}()
	return intStream
}

// 乘法阶段
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
	multipliedStream := make(chan int)
	go func() {
		defer close(multipliedStream)
		for i := range intStream {
			select {
			case <-done:
				return
			case multipliedStream <- i * multiplier:
			}
		}
	}()

	return multipliedStream
}

// 加法阶段
add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
	addedStream := make(chan int)
	go func() {
		defer close(addedStream)
		for i := range intStream {
			select {
			case <-done:
				return
			case addedStream <- i + additive:
			}
		}
	}()
	return addedStream
}

// 防止协程泄露的done管道
done := make(chan interface{})
defer close(done)

// 生成一些值,并进行管道运算
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

// 输出管道运算结果
for v := range pipeline {
	fmt.Println(v)
}

在这个管道模式中,我们看到两件事:

  • 在管道的末尾,可以使用range语句来提取值;
  • 在每个阶段可以安全地并发执行,因为输入和输出在并发上下文中是安全的。

看到关闭done通道是如何影响到管道的了么?这是通过管道每个阶段的两件事情实现的:

  • 对传入的频道进行遍历。当输入通道关闭时,遍历操作将退出。
  • 发送操作与done通道共享select语句。

无论流水线阶段处于等待数据通道的状态,还是处在等待发送通道关闭的状态,都会强制管道各阶段终止。这里有一个复发关系。在管道开始时,我们已经确定必须将传入的切片值转换为通道。在这个过程中有两点必须是可抢占的:

  • 在生成器通道上创建值。
  • 在其频道上发送离散值。

在管道开始和结束之间,代码总是在一个通道上遍历,并在包含done通道的select语句内的另一个通道上发送。如果某个阶段在传入通道检索到值时被阻塞,则该通道关闭时它将变为未阻塞状态。 如果某个阶段在发送值时被阻塞,则由于select语句而可抢占。因此,整个管道始终可以通过关闭done通道来抢占。

三、生成器模式:

在上面的generator中我们看到一个简单的生成器,咋一看还比较死板,我们可以利用通道构建一个可获取特定重复值的生成器。

以下函数会重复你传给它的值,直到你告诉它停止:

var repeat = func(done <-chan interface{}, values ...interface{}) <-chan interface{} {

	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)
		for {
			for _, v := range values {
				select {
				case <-done:
					return
				case valueStream <- v:
				}
			}
		}
	}()
	return valueStream
}

以下函数会从其传入的valueStream中取出第一个元素然后退出:

var take = func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {

	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for i := 0; i < num; i++ {
			select {
			case <-done:
				return
			case takeStream <- <-valueStream:
			}
		}
	}()
	return takeStream
}

OK,让我们组合它们使用,看一个用例:

func  Demo() {
    done := make(chan interface{})
    defer close(done)

    for num := range take(done, repeat(done, 1), 10) {
    	fmt.Printf("%v ", num)
    }
}

在这个基本的例子中,我们创建了一个repeat生成器来生成无限数量的重复生成器,但是只取前10个。repeat生成器由take接收。虽然我们可以生成无线数量的流,但只会生成n+1个实例,其中n是我们传入take的数量。

除了生成特定的固定数量的值,我们还可以扩展一下,如果把repeat扩展成repeatFn,我们可以生成任何数据:

var repeatFn = func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {

	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)
		for {
			select {
			case <-done:
				return
			case valueStream <- fn():
			}
		}
	}()
	return valueStream
}

继续看用例:

func Demo() {
    done := make(chan interface{})
    defer close(done)
    rand := func() interface{} {
    	return rand.Int()
    }
    for num := range take(done, repeatFn(done, rand), 10) {
    	fmt.Println(num)
    }
}