设定好 pool 的 goroutine 数量,预先启动多个 goroutine,把 job 传给这些 goroutine 执行
与 Thread-Per-Message Pattern 类似,都是将 message 或 job 传给 goroutine 执行的 pattern,不同的是:
预先启动好的 goroutine 可以先做一些前置动作,例如 DB 连线、与其他 service 的 socket 连线等,可避免收到 job 时才开始执行这些动作导致速度缓慢,或者无法控制 goroutine 数量导致系统崩溃。
延续DAY 5:Thread-Per-Message Pattern的情境
设计一个推播新闻系统,会将新的新闻直接推播出去,我们希望推播系统效率要高,并且每次推播都会跟某 service 建立 socket 拿取资料,如图:
图中socket 连线属於耗时的 IO 行为,每个 goroutine 都连线 socket 会导致初始化过慢,并且 goroutine 一多起来,socket 连线过多会导致连线损坏。
设计有问题的程序码如下:
(相关的 code 在Github - go-design-patterns。)
package main
import (
"fmt"
"time"
)
func PushNews(news string, startTime time.Time) <-chan time.Time {
newsCh := make(chan time.Time)
go func() {
time.Sleep(time.Duration(10 * time.Second)) //模拟与某service建立socket的时间
time.Sleep(time.Duration(3 * time.Second)) //模拟推播运行的时间
fmt.Printf("%s cost %s\n", news, time.Since(startTime))
newsCh <- time.Now()
}()
return newsCh
}
func main() {
start := time.Now()
allNews := []string{
"中秋节来了",
"记得",
"不要户外烤肉~",
"也不要吃太撑",
}
newsChs := []<-chan time.Time{}
for _, news := range allNews {
newsChs = append(newsChs, PushNews(news, start))
}
// do something
for index, newsCh := range newsChs {
fmt.Printf("news %d is sent at %s\n", index, <-newsCh)
}
}
time.Sleep(time.Duration(10 * time.Second))
会导致每个 goroutine 都花了 10 秒来练线 socket,导致运行缓慢。
如果可以:
即可避免 goroutine 接到 job 初始化过慢的问题,程序码如下:
package main
import (
"fmt"
"time"
)
type SendInfo struct {
NewsName string
FinishTime time.Time
}
func Worker(id int, jobs <-chan string, results chan<- SendInfo, startTime time.Time) {
time.Sleep(time.Duration(10 * time.Second)) //模拟与某service建立socket的时间
for job := range jobs {
time.Sleep(time.Duration(3 * time.Second)) //模拟推播运行的时间
fmt.Printf("%s cost %s by worker %d\n", job, time.Since(startTime), id)
results <- SendInfo{job, time.Now()}
}
}
func main() {
start := time.Now()
allNews := []string{
"中秋节来了",
"记得",
"不要户外烤肉~",
"也不要吃太撑",
}
jobs := make(chan string, len(allNews))
results := make(chan SendInfo, len(allNews))
for w := 1; w <= 2; w++ {
go Worker(w, jobs, results, start)
}
for _, news := range allNews {
jobs <- news
}
// do something
for r := 1; r <= len(allNews); r++ {
result := <-results
fmt.Printf("news %s is sent at %s\n", result.NewsName, result.FinishTime)
}
}
for w := 1; w <= 2; w++
在 pool 创建了三个 goroutine,称之为 worker,他们会先初始化 socket 连线,让後续有 job 传入时不需要再执行一次 socket 连线for _, news := range allNews
即开始送 job 给 worker,由於 worker 已经初始化,运行不会被 socket 初始化拖慢概念如图:
运行结果如图:
如此一来每次运行 goroutine 就不用做 socket 连线,节省了许多时间。
不过,由於 pool 只有运行三个 goroutine,而 news 有四个,所以第四个也不要吃太撑
news 会在 pool 都处理完前三个 news 後再执行。Worker Pool Pattern 可以限制 pool 的 goroutine 数量,以避免系统负载过大,但也需要考虑 pool 是否过小,导致 news jobs 常常需等待 pool 的情形。
<<: DAY22 MongoDB Profiler 几个指令抓出拖垮系统的元凶
待完成... ...
本系列文章经过重新编排和扩充,已出书为ECMAScript关键30天。原始文章因当时准备时程紧迫,...
从芝麻街角色到改变NLP的模型 如果你用Google以「BERT」作为关键字搜寻图片,那麽你会发现一...
这篇我是在讨论提供反馈 (giving feedback)。但对主管来说,如何接受反馈 (takin...
我们来看到C++要如何计算程序码的执行时间呢 ~ 学习目标: 计算程序码执行时间的实务 学习难度: ...