DAY 9:Worker Pool Pattern,就。很。Pool。

什麽是 Worker Pool Pattern?

设定好 pool 的 goroutine 数量,预先启动多个 goroutine,把 job 传给这些 goroutine 执行

与 Thread-Per-Message Pattern 类似,都是将 message 或 job 传给 goroutine 执行的 pattern,不同的是:

  • Thread-Per-Message Patternt 当接收到 message 的时候启动 goroutine
  • Worker Pool Pattern 是预先启动好 goroutine,称之为pool

预先启动好的 goroutine 可以先做一些前置动作,例如 DB 连线、与其他 service 的 socket 连线等,可避免收到 job 时才开始执行这些动作导致速度缓慢,或者无法控制 goroutine 数量导致系统崩溃。

问题情境

延续DAY 5:Thread-Per-Message Pattern的情境

设计一个推播新闻系统,会将新的新闻直接推播出去,我们希望推播系统效率要高,并且每次推播都会跟某 service 建立 socket 拿取资料,如图:

图中socket 连线属於耗时的 IO 行为,每个 goroutine 都连线 socket 会导致初始化过慢,并且 goroutine 一多起来,socket 连线过多会导致连线损坏。

设计有问题的程序码如下:

(相关的 code 在Github - go-design-patterns。)

package main

import (
	"fmt"
	"time"
)

func PushNews(news string, startTime time.Time) <-chan time.Time {
	newsCh := make(chan time.Time)
	go func() {
		time.Sleep(time.Duration(10 * time.Second)) //模拟与某service建立socket的时间
		time.Sleep(time.Duration(3 * time.Second))  //模拟推播运行的时间
		fmt.Printf("%s cost %s\n", news, time.Since(startTime))
		newsCh <- time.Now()
	}()
	return newsCh
}

func main() {
	start := time.Now()
	allNews := []string{
		"中秋节来了",
		"记得",
		"不要户外烤肉~",
		"也不要吃太撑",
	}
	newsChs := []<-chan time.Time{}
	for _, news := range allNews {
		newsChs = append(newsChs, PushNews(news, start))
	}

	// do something

	for index, newsCh := range newsChs {
		fmt.Printf("news %d is sent at %s\n", index, <-newsCh)
	}
}

time.Sleep(time.Duration(10 * time.Second))会导致每个 goroutine 都花了 10 秒来练线 socket,导致运行缓慢。

解决方式

如果可以:

  • 控制 goroutine 数量
  • 预先连线 socket

即可避免 goroutine 接到 job 初始化过慢的问题,程序码如下:

package main

import (
	"fmt"
	"time"
)

type SendInfo struct {
	NewsName   string
	FinishTime time.Time
}

func Worker(id int, jobs <-chan string, results chan<- SendInfo, startTime time.Time) {
	time.Sleep(time.Duration(10 * time.Second)) //模拟与某service建立socket的时间
	for job := range jobs {
		time.Sleep(time.Duration(3 * time.Second)) //模拟推播运行的时间
		fmt.Printf("%s cost %s by worker %d\n", job, time.Since(startTime), id)
		results <- SendInfo{job, time.Now()}
	}
}

func main() {
	start := time.Now()
	allNews := []string{
		"中秋节来了",
		"记得",
		"不要户外烤肉~",
		"也不要吃太撑",
	}
	jobs := make(chan string, len(allNews))
	results := make(chan SendInfo, len(allNews))

	for w := 1; w <= 2; w++ {
		go Worker(w, jobs, results, start)
	}

	for _, news := range allNews {
		jobs <- news
	}

	// do something

	for r := 1; r <= len(allNews); r++ {
		result := <-results
		fmt.Printf("news %s is sent at %s\n", result.NewsName, result.FinishTime)
	}
}
  • for w := 1; w <= 2; w++在 pool 创建了三个 goroutine,称之为 worker,他们会先初始化 socket 连线,让後续有 job 传入时不需要再执行一次 socket 连线
  • for _, news := range allNews即开始送 job 给 worker,由於 worker 已经初始化,运行不会被 socket 初始化拖慢
  • 控制 worker 的数量为三个,使 socket 连线数是可控的

概念如图:

运行结果如图:

如此一来每次运行 goroutine 就不用做 socket 连线,节省了许多时间。

不过,由於 pool 只有运行三个 goroutine,而 news 有四个,所以第四个也不要吃太撑news 会在 pool 都处理完前三个 news 後再执行。Worker Pool Pattern 可以限制 pool 的 goroutine 数量,以避免系统负载过大,但也需要考虑 pool 是否过小,导致 news jobs 常常需等待 pool 的情形。


<<:  DAY22 MongoDB Profiler 几个指令抓出拖垮系统的元凶

>>:  Day 07 Style

总结

本系列文章经过重新编排和扩充,已出书为ECMAScript关键30天。原始文章因当时准备时程紧迫,...

【Day 1】谁是BERT?如何BERT?BERT的基础介绍

从芝麻街角色到改变NLP的模型 如果你用Google以「BERT」作为关键字搜寻图片,那麽你会发现一...

反馈与「大便三明治」

这篇我是在讨论提供反馈 (giving feedback)。但对主管来说,如何接受反馈 (takin...

【C++】计算程序的执行时间

我们来看到C++要如何计算程序码的执行时间呢 ~ 学习目标: 计算程序码执行时间的实务 学习难度: ...