DAY 8:Producer Consumer Pattern,点菜了,三份穿裤子的猪,一盘热空气,把牛变成鳟鱼

什麽是 Producer Consumer Pattern?

多个 Producer(生产者)提供任 Job 任务,多个 Consumer(消费者)消费任务

有时候系统的任务不会直接执行,而由多个 Producer 程序存到一个 queue 中,再由其他 Consumer 程序读取 queue 执行,这样的话可以使 Producer 与 Consumer 程序间没有直接关系,他们只依赖 queue,即可解耦。

例如在微服务的系统下,会利用 kafka 来做 message queue system,这样即使微服务 auto scaling(水平扩增)也不会为服务找不到彼此,以 golang 的维度去对比这个问题如图:

由於 Producer goroutine 直接呼叫 A Consumer goroutine,导致两者绑定,Producer goroutine 没有机会把资讯传送给 B Consumer goroutine 消费,这样资讯一多时,Consumer 程序没办法增强消费能力会导致缓慢。

所以会设计一个 job channel 来搜集多个 Producer 的 Job,并交由 Consumer 处理,gorotine 只相依 channel 而不是其他 gorotine,就可以扩增 gorotine 的数量,如图:

问题情境

类似 Uber 的计程车系统,会有多个使用者叫车,不同的司机接单会收到此使用者的资讯。

相关的 code 在Github - go-design-patterns

实作有问题的系统如下,有三位使用者糖糖盐盐乖乖分别会使用UberProducer()去叫车,由於没有 job channel,每位使用者都在叫车时就要立即用UberConsumer()指定司机载人,这导致系统没有分配 job 给 consumer 的功能:

package main

import (
	"fmt"
	"time"
)

type UserInfo struct {
	ID   uint32
	Name string
}

var userInfos = []UserInfo{
	{
		1,
		"糖糖",
	},
	{
		2,
		"盐盐",
	},
	{
		3,
		"乖乖",
	},
}

func UberProducer(job chan<- UserInfo, i int) {
	go UberConsumer(userInfos[i], i)
}

func UberConsumer(userInfo UserInfo, id int) {
	fmt.Printf("uber consumer %d get %s user\n", id, userInfo.Name)
}

func main() {
	job := make(chan UserInfo)
	UberProducerCount := len(userInfos)

	for i := 0; i < UberProducerCount; i++ {
		go UberProducer(job, i)
	}

	time.Sleep(10 * time.Second) //等待goroutine执行完毕
}

这使得每次叫车都只会叫到 consumer 0, 1, 2:

解决方式

三位使用者一样使用UberProducer()去叫车,而设计一个 job channel 会搜集这三位使用者的叫车单与资讯。而UberConsumer()则会利用for userInfo := range job不断监听 job channel 是否有新的叫车,如果有的话就执行载客服务

package main

import (
	"fmt"
	"time"
)

type UserInfo struct {
	ID   uint32
	Name string
}

var userInfos = []UserInfo{
	{
		1,
		"糖糖",
	},
	{
		2,
		"盐盐",
	},
	{
		3,
		"乖乖",
	},
}

func UberProducer(job chan<- UserInfo, i int) {
	job <- userInfos[i]
}

func UberConsumer(job <-chan UserInfo, id int) {
	for userInfo := range job {
		fmt.Printf("uber consumer %d get %s user\n", id, userInfo.Name)
	}
}

func main() {
	job := make(chan UserInfo)
	UberProducerCount := len(userInfos)
	UberConsumerCount := 5

	for i := 0; i < UberProducerCount; i++ {
		go UberProducer(job, i)
	}

	for i := 0; i < UberConsumerCount; i++ {
		go UberConsumer(job, i)
	}

	time.Sleep(10 * time.Second) //等待goroutine执行完毕
}

由於 job channel 的关系,只要正在等待的 consumer 都有机会获得 job,所以运行的结果是 consumer 3, 0, 4 载到客,并非 0, 1, 2:


<<:  [Day18] Andoroid - Kotlin笔记: sealed class

>>:  Day 20:非 GUI 类工具之 juce::Analytics

输出的重要性

在去年参加 iT 铁人赛挑战时,选择的题目是需要实作(见:30 天开发 Android App 的流...

JavaScript Day 22. Hoisting

在 JavaScript 里还有一个概念称为「Hoisting ( 提升 )」,底下先执行一段范例:...

[DAY 11] Torchvision 简介

前言 再想要快速测试一个想法时,固然我们已经有 Pytorch 可以帮我们快速一层一层地搭建一个可以...

[Day 25] Android Studio 七日陨石开发:一起来布局 App 介面!

前言 昨天我们成功的运行了自己做的App。 但我们还有2个步骤要做: 布局App元件 实作App功能...

服务器运作简介

今天是第一天,我先简单的介绍一下网站服务器是如何运作的。还有如果在遭遇大量流量时,可能会有哪些状况。...