将 input 由一个 producer 分发多个 goroutine 运行,再将多个 task goroutine 运行的结果由一个 consumer 收集资料合并为 output
如果程序的有着复杂的计算或者多个 IO 运行,可以将这些运行分发给 task goroutine 执行,使 task 执行更快速,在统一收集继续下个流程。其中分发与收集的行为又被称为 Fan Out、Fan In:
设计一个新闻资讯网页系统,需要从 A、B、C server 拿取资料,这些资料都没有顺序性,纯粹是要都显示在网页上而已,所以如果 A 资料拿完再拿 B,这样就太浪费时间了。可以同时拿取 A、B、C server 的资料加快取资料的速度。
实作有问题的系统如下, A、B、C server 透过GetServerData()
拿取资料,再透过ShowNews
显示新闻资料:
package main
import (
"fmt"
"math/rand"
"time"
)
func GetServerData(serverName string) string {
time.Sleep(time.Duration(rand.Intn(3)) * time.Second) //模拟取得server data消耗的时间
return fmt.Sprintf("%s server data", serverName)
}
func ShowNews(news ...interface{}) {
fmt.Println(news...)
}
func main() {
start := time.Now()
responseByServerA := GetServerData("A")
responseByServerB := GetServerData("B")
responseByServerC := GetServerData("C")
ShowNews(responseByServerA, responseByServerB, responseByServerC)
fmt.Printf("cost %s", time.Since(start))
}
会发现因为拿取资料无法并行,所以耗时较久
实作Producer()
、Task()
、Consumer()
来分别分发任务、执行任务、收集资料,
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func Producer(serverNames ...string) <-chan string {
producerCh := make(chan string, len(serverNames))
go func() {
defer close(producerCh)
for _, serverName := range serverNames {
producerCh <- serverName
}
}()
return producerCh
}
func Task(producerCh <-chan string) <-chan string {
taskCh := make(chan string)
go func() {
defer close(taskCh)
for serverName := range producerCh {
taskCh <- GetServerData(serverName)
}
}()
return taskCh
}
func Consumer(taskChs ...<-chan string) <-chan string {
consumerCh := make(chan string)
var wg sync.WaitGroup
wg.Add(len(taskChs))
go func() {
wg.Wait()
close(consumerCh)
}()
for _, task := range taskChs {
go func(task <-chan string) {
defer wg.Done()
for new := range task {
consumerCh <- new
}
}(task)
}
return consumerCh
}
func GetServerData(serverName string) string {
time.Sleep(time.Duration(rand.Intn(3)) * time.Second) //模拟取得server data消耗的时间
return fmt.Sprintf("%s server data", serverName)
}
func ShowNews(news ...interface{}) {
fmt.Println(news...)
}
func main() {
start := time.Now()
producerCh := Producer("A", "B", "C")
task1 := Task(producerCh)
task2 := Task(producerCh)
task3 := Task(producerCh)
consumerCh := Consumer(task1, task2, task3)
for new := range consumerCh {
ShowNews(new)
}
fmt.Printf("cost %s", time.Since(start))
}
程序码较长,重点如下:
Producer()
後,Producer()
会创建一个 Channel 来分发任务,所以需再将此 Channel 传给Task()
使其 goroutine 获得任务Task()
获得任务开始执行後,也会产生各自的 Channel 用来传递 server 的资料,所以需再将此 Channel 送至Consumer()
Consumer()
获得所有Task()
的 Channel 後,会在启动相对数量的 gorotine 合并资料至consumerCh{}
,为了要确保资料取得完毕後关闭consumerCh{}
,需透过sync.WaitGroup{}
来取得close(consumerCh)
的时机,时机的逻辑如下:
wg.Add()
会加入需等待的数目,这边输入 goroutine 的数量wg.Done()
会减去需等待的数目wg.Wait()
会使程序等待,等待至sync.WaitGroup{}
等待数目被减去至 0 时才会继续执行wg.Done()
都安排在读取完Task()
Channel 後,就可以确保读完资料再close(consumerCh)
for new := range consumerCh
会读取 Channel 资料,直到close(consumerCh)
後跳脱 for 回圈执行後由於取的资料可以同时执行因此加快了执行速度:
如果画面太小或看不清楚,可移驾至 https://www.youtube.com/watch?v=...
本篇为Vue CLI最後一篇,专案已经建立好,环境也有概略的说明,接下来当然是上传到Gihub Pa...
本篇内容要介绍Button元件, 除了认识Button的语法、属性外, 同时也要为按钮设置监听及触发...
早期运动Day10 - 对於压力,我们都需要平衡报导 今天在运动时,听着《自控力:和压力做朋友》 里...
说明 我在本系列文章中,主要是采用 IntelliJ-IDEA 作为示范。但我不会在文章中跟你讲述如...