DAY 7:Fan-Out Fan-In Pattern,看吧世界!这就是多人解决的力量!

什麽是 Fan-Out Fan-In Pattern?

将 input 由一个 producer 分发多个 goroutine 运行,再将多个 task goroutine 运行的结果由一个 consumer 收集资料合并为 output

如果程序的有着复杂的计算或者多个 IO 运行,可以将这些运行分发给 task goroutine 执行,使 task 执行更快速,在统一收集继续下个流程。其中分发与收集的行为又被称为 Fan Out、Fan In:

  • Fan Out: input 传入 producer 後开启多个 goroutine 运行,直到 producer 不再接收 input,就像是分发任务一般,所以称为 Fan Out
  • Fan In: 由多个 input 传入 consumer 後合并为 output 传出,直到 consumer 不再接收 input,就像是收集资料一般,所以称为 Fan In

问题情境

设计一个新闻资讯网页系统,需要从 A、B、C server 拿取资料,这些资料都没有顺序性,纯粹是要都显示在网页上而已,所以如果 A 资料拿完再拿 B,这样就太浪费时间了。可以同时拿取 A、B、C server 的资料加快取资料的速度。

实作有问题的系统如下, A、B、C server 透过GetServerData()拿取资料,再透过ShowNews显示新闻资料:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func GetServerData(serverName string) string {
	time.Sleep(time.Duration(rand.Intn(3)) * time.Second) //模拟取得server data消耗的时间
	return fmt.Sprintf("%s server data", serverName)
}

func ShowNews(news ...interface{}) {
	fmt.Println(news...)
}

func main() {
	start := time.Now()
	responseByServerA := GetServerData("A")
	responseByServerB := GetServerData("B")
	responseByServerC := GetServerData("C")
	ShowNews(responseByServerA, responseByServerB, responseByServerC)
	fmt.Printf("cost %s", time.Since(start))
}

会发现因为拿取资料无法并行,所以耗时较久

解决方式

实作Producer()Task()Consumer()来分别分发任务执行任务收集资料

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func Producer(serverNames ...string) <-chan string {
	producerCh := make(chan string, len(serverNames))
	go func() {
		defer close(producerCh)
		for _, serverName := range serverNames {
			producerCh <- serverName
		}
	}()
	return producerCh
}

func Task(producerCh <-chan string) <-chan string {
	taskCh := make(chan string)
	go func() {
		defer close(taskCh)
		for serverName := range producerCh {
			taskCh <- GetServerData(serverName)
		}
	}()
	return taskCh
}

func Consumer(taskChs ...<-chan string) <-chan string {
	consumerCh := make(chan string)

	var wg sync.WaitGroup
	wg.Add(len(taskChs))
	go func() {
		wg.Wait()
		close(consumerCh)
	}()

	for _, task := range taskChs {
		go func(task <-chan string) {
			defer wg.Done()
			for new := range task {
				consumerCh <- new
			}
		}(task)
	}

	return consumerCh
}

func GetServerData(serverName string) string {
	time.Sleep(time.Duration(rand.Intn(3)) * time.Second) //模拟取得server data消耗的时间
	return fmt.Sprintf("%s server data", serverName)
}

func ShowNews(news ...interface{}) {
	fmt.Println(news...)
}

func main() {
	start := time.Now()
	producerCh := Producer("A", "B", "C")

	task1 := Task(producerCh)
	task2 := Task(producerCh)
	task3 := Task(producerCh)

	consumerCh := Consumer(task1, task2, task3)

	for new := range consumerCh {
		ShowNews(new)
	}
	fmt.Printf("cost %s", time.Since(start))
}

程序码较长,重点如下:

  • 将需要拿取资料的 server 名称传递给Producer()後,Producer()会创建一个 Channel 来分发任务,所以需再将此 Channel 传给Task()使其 goroutine 获得任务
  • Task()获得任务开始执行後,也会产生各自的 Channel 用来传递 server 的资料,所以需再将此 Channel 送至Consumer()
  • Consumer()获得所有Task()的 Channel 後,会在启动相对数量的 gorotine 合并资料至consumerCh{},为了要确保资料取得完毕後关闭consumerCh{},需透过sync.WaitGroup{}来取得close(consumerCh)的时机,时机的逻辑如下:
    • wg.Add()会加入需等待的数目,这边输入 goroutine 的数量
    • wg.Done()会减去需等待的数目
    • wg.Wait()会使程序等待,等待至sync.WaitGroup{}等待数目被减去至 0 时才会继续执行
    • 所以将wg.Done()都安排在读取完Task()Channel 後,就可以确保读完资料再close(consumerCh)
  • for new := range consumerCh会读取 Channel 资料,直到close(consumerCh)後跳脱 for 回圈

执行後由於取的资料可以同时执行因此加快了执行速度:


<<:  Vue.js 从零开始:箭头函式

>>:  【Day04】Component 与 Props

Episode 3 - 开发工具安装

如果画面太小或看不清楚,可移驾至 https://www.youtube.com/watch?v=...

Vue.js 从零开始:Vue CLI / Gihub Pages

本篇为Vue CLI最後一篇,专案已经建立好,环境也有概略的说明,接下来当然是上传到Gihub Pa...

Day-19 Button

本篇内容要介绍Button元件, 除了认识Button的语法、属性外, 同时也要为按钮设置监听及触发...

压力平衡

早期运动Day10 - 对於压力,我们都需要平衡报导 今天在运动时,听着《自控力:和压力做朋友》 里...

Java 开发 WEB 的好平台 -- Grails -- (2) 新增一个 Grails 专案

说明 我在本系列文章中,主要是采用 IntelliJ-IDEA 作为示范。但我不会在文章中跟你讲述如...