多个 Producer(生产者)提供任 Job 任务,多个 Consumer(消费者)消费任务
有时候系统的任务不会直接执行,而由多个 Producer 程序存到一个 queue 中,再由其他 Consumer 程序读取 queue 执行,这样的话可以使 Producer 与 Consumer 程序间没有直接关系,他们只依赖 queue,即可解耦。
例如在微服务的系统下,会利用 kafka 来做 message queue system,这样即使微服务 auto scaling(水平扩增)也不会为服务找不到彼此,以 golang 的维度去对比这个问题如图:
由於 Producer goroutine 直接呼叫 A Consumer goroutine,导致两者绑定,Producer goroutine 没有机会把资讯传送给 B Consumer goroutine 消费,这样资讯一多时,Consumer 程序没办法增强消费能力会导致缓慢。
所以会设计一个 job channel 来搜集多个 Producer 的 Job,并交由 Consumer 处理,gorotine 只相依 channel 而不是其他 gorotine,就可以扩增 gorotine 的数量,如图:
类似 Uber 的计程车系统,会有多个使用者叫车,不同的司机接单会收到此使用者的资讯。
相关的 code 在Github - go-design-patterns。
实作有问题的系统如下,有三位使用者糖糖
、盐盐
、乖乖
分别会使用UberProducer()
去叫车,由於没有 job channel,每位使用者都在叫车时就要立即用UberConsumer()
指定司机载人,这导致系统没有分配 job 给 consumer 的功能:
package main
import (
"fmt"
"time"
)
type UserInfo struct {
ID uint32
Name string
}
var userInfos = []UserInfo{
{
1,
"糖糖",
},
{
2,
"盐盐",
},
{
3,
"乖乖",
},
}
func UberProducer(job chan<- UserInfo, i int) {
go UberConsumer(userInfos[i], i)
}
func UberConsumer(userInfo UserInfo, id int) {
fmt.Printf("uber consumer %d get %s user\n", id, userInfo.Name)
}
func main() {
job := make(chan UserInfo)
UberProducerCount := len(userInfos)
for i := 0; i < UberProducerCount; i++ {
go UberProducer(job, i)
}
time.Sleep(10 * time.Second) //等待goroutine执行完毕
}
这使得每次叫车都只会叫到 consumer 0, 1, 2:
三位使用者一样使用UberProducer()
去叫车,而设计一个 job channel 会搜集这三位使用者的叫车单与资讯。而UberConsumer()
则会利用for userInfo := range job
不断监听 job channel 是否有新的叫车,如果有的话就执行载客服务
package main
import (
"fmt"
"time"
)
type UserInfo struct {
ID uint32
Name string
}
var userInfos = []UserInfo{
{
1,
"糖糖",
},
{
2,
"盐盐",
},
{
3,
"乖乖",
},
}
func UberProducer(job chan<- UserInfo, i int) {
job <- userInfos[i]
}
func UberConsumer(job <-chan UserInfo, id int) {
for userInfo := range job {
fmt.Printf("uber consumer %d get %s user\n", id, userInfo.Name)
}
}
func main() {
job := make(chan UserInfo)
UberProducerCount := len(userInfos)
UberConsumerCount := 5
for i := 0; i < UberProducerCount; i++ {
go UberProducer(job, i)
}
for i := 0; i < UberConsumerCount; i++ {
go UberConsumer(job, i)
}
time.Sleep(10 * time.Second) //等待goroutine执行完毕
}
由於 job channel 的关系,只要正在等待的 consumer 都有机会获得 job,所以运行的结果是 consumer 3, 0, 4 载到客,并非 0, 1, 2:
<<: [Day18] Andoroid - Kotlin笔记: sealed class
>>: Day 20:非 GUI 类工具之 juce::Analytics
在去年参加 iT 铁人赛挑战时,选择的题目是需要实作(见:30 天开发 Android App 的流...
在 JavaScript 里还有一个概念称为「Hoisting ( 提升 )」,底下先执行一段范例:...
前言 再想要快速测试一个想法时,固然我们已经有 Pytorch 可以帮我们快速一层一层地搭建一个可以...
前言 昨天我们成功的运行了自己做的App。 但我们还有2个步骤要做: 布局App元件 实作App功能...
今天是第一天,我先简单的介绍一下网站服务器是如何运作的。还有如果在遭遇大量流量时,可能会有哪些状况。...