前 11 天已经将常见的 concurrency patterns 介绍完毕,今天我们要介绍的不是 patterns,而是在实务使用 concurrency patterns 要特别注意的事项「Graceful Shutdown」。
在实务的系统设计中,会将多个 pattern 融合以应付复杂的需求,本篇程序码很长,如果范例过於复杂,建议可以先看 DAY 7 ~ DAY 11 的文章,先以较小的知识点来理解 pattern,再进入多个 pattern 结合的本文章。
安全的关闭正在运行的 goroutine,即 goroutine 当前的任务运行完毕再关闭
在DAY 9:Worker Pool Pattern,就。很。Pool。,或DAY 7:Fan-Out Fan-In Pattern,看吧世界!这就是多人解决的力量!的 pattern 中,如果在运行时直接使用 ctrl+c 关闭程序,正在运行的 goroutine 会直接关闭,导致正在被消费的 job 中断,这样会造成程序不安全。
举例来说,如果有系统正在对 DB 进行 transaction,但 server 直接 shutdown 了,那 transaction 就不会 rollback,整个 DB 就会停在 transaction 的状态,造成系统不稳定,我们期望应该是
server 收到 shutdown 的请求後,停止接收新的 requests,将现有的 requests 处理完毕後 shutdown
延续推播新闻系统的情境,将新的新闻直接推播出去,除了推播系统效率要高,还需纪录推播完成的时间。
另外还加上以下各种情境,这都需要前面的 DAY 1 ~ DAY 11 的 pattern 观念,我将其中比较核心的 pattern 也写出来:
整体的运行图:
相关的 code 在Github - go-design-patterns。整体程序码:
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
type SendInfo struct {
NewsName string
FinishTime time.Time
}
type NewsSender struct {
inputsChan chan string
jobsChan chan string
resultsChan chan SendInfo
wg *sync.WaitGroup
}
type NewsCompleteLogger struct {
completeNews <-chan SendInfo
done chan bool
}
func (c *NewsSender) StartConsume(ctx context.Context) {
for {
select {
case in := <-c.inputsChan:
if ctx.Err() != nil {
close(c.jobsChan)
return
}
c.jobsChan <- in
case <-ctx.Done():
close(c.jobsChan)
return
}
}
}
func (c *NewsSender) StartWorker(ctx context.Context, id int) {
defer c.wg.Done()
fmt.Printf("create worker %d\n", id)
time.Sleep(time.Duration(3 * time.Second)) //模拟与某service建立socket的时间
for {
select {
case job, ok := <-c.jobsChan:
if !ok {
fmt.Printf("close worker %d\n", id)
return
}
time.Sleep(time.Duration(3 * time.Second)) //模拟推播运行的时间
fmt.Printf("<<worker %d finish %s>>\n", id, job)
c.resultsChan <- SendInfo{job, time.Now()}
case <-ctx.Done():
fmt.Printf("close worker %d\n", id)
return
}
}
}
func (c *NewsSender) CreateWorkerPool(ctx context.Context, poolSize int) {
c.wg = &sync.WaitGroup{}
c.wg.Add(poolSize)
for w := 0; w < poolSize; w++ {
go c.StartWorker(ctx, w)
}
}
func (c *NewsSender) StopWait(ctx context.Context) {
c.wg.Wait()
close(c.resultsChan)
}
func CreateNewsSender(ctx context.Context) *NewsSender {
newsSender := NewsSender{
inputsChan: make(chan string),
jobsChan: make(chan string),
resultsChan: make(chan SendInfo),
}
return &newsSender
}
func ProduceToNewsSender(allNews []string, inputsChan chan<- string) {
for _, news := range allNews {
fmt.Printf("<<producer send %s>>\n", news)
inputsChan <- news
}
}
func CreateNewsCompleteLogger(ctx context.Context, completeNews <-chan SendInfo) *NewsCompleteLogger {
newsCompleteLogger := NewsCompleteLogger{
completeNews: completeNews,
done: make(chan bool),
}
return &newsCompleteLogger
}
func (n *NewsCompleteLogger) StartLog(ctx context.Context) {
for result := range n.completeNews {
fmt.Printf("<<fan in news>> news %s is sent at %s\n", result.NewsName, result.FinishTime)
}
close(n.done)
}
func (n *NewsCompleteLogger) StopWait(ctx context.Context) {
<-n.done
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
newsSender := CreateNewsSender(ctx)
newsCompleteLogger := CreateNewsCompleteLogger(ctx, newsSender.resultsChan)
newsSender.CreateWorkerPool(ctx, 3)
go ProduceToNewsSender([]string{
"中秋节来了",
"记得",
"不要户外烤肉~",
"也不要吃太撑",
}, newsSender.inputsChan)
go ProduceToNewsSender([]string{
"床前明月光",
"疑似地上霜",
"举头望明月",
"低头思故乡",
}, newsSender.inputsChan)
go ProduceToNewsSender([]string{
"渭城朝雨邑轻尘",
"客舍青青柳色新",
"劝君更尽一杯酒",
"西出阳关无故人",
}, newsSender.inputsChan)
go newsSender.StartConsume(ctx)
go newsCompleteLogger.StartLog(ctx)
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
fmt.Println("================\nctrl+c event\n================")
cancel()
newsSender.StopWait(ctx)
newsCompleteLogger.StopWait(ctx)
}
首先,整个系统分别有两个 struct NewsSender{}
与NewsCompleteLogger{}
,
type NewsSender struct {
inputsChan chan string
jobsChan chan string
resultsChan chan SendInfo
wg *sync.WaitGroup
}
type NewsCompleteLogger struct {
completeNews <-chan SendInfo
done chan bool
}
NewsSender{}
拥有
inputsChan{}
与 producer goroutine 沟通jobsChan{}
将收到的任务朝 worker 发送resultsChan{}
将 worker 处理完毕的新闻资讯与NewsCompleteLogger{}
沟通wg{}
用来等待所有 goroutine 处理结束NewsCompleteLogger{}
拥有
completeNews{}
接收resultsChan{}
的新闻资讯done
等待completeNews{}
消费完毕後,发送讯号的 channelNewsSender{}
会实作 Producer Consumer Pattern、Worker Pool Pattern、Fan-Out Fan-In Pattern
将多个新闻输入为 Producer Consumer Pattern,透过ProduceToNewsSender()
送至newsSender.inputsChan{}
func ProduceToNewsSender(allNews []string, inputsChan chan<- string) {
for _, news := range allNews {
fmt.Printf("<<producer send %s>>\n", news)
inputsChan <- news
}
}
go ProduceToNewsSender([]string{
"中秋节来了",
"记得",
"不要户外烤肉~",
"也不要吃太撑",
}, newsSender.inputsChan)
go ProduceToNewsSender([]string{
"床前明月光",
"疑似地上霜",
"举头望明月",
"低头思故乡",
}, newsSender.inputsChan)
go ProduceToNewsSender([]string{
"渭城朝雨邑轻尘",
"客舍青青柳色新",
"劝君更尽一杯酒",
"西出阳关无故人",
}, newsSender.inputsChan)
而NewsSender.StartConsume()
是一个 consumer 来消费,这是因为newsSender.inputsChan{}
会再将接收到的新闻传送至newsSender.jobsChan{}
供 worker pool 处理。
func (c *NewsSender) StartConsume(ctx context.Context) {
for {
select {
case in := <-c.inputsChan:
if ctx.Err() != nil {
close(c.jobsChan)
return
}
c.jobsChan <- in
case <-ctx.Done():
close(c.jobsChan)
return
}
}
}
这些 woker 会预先启动,即 Worker Pool Pattern 的观念,已避免每次使用 goroutine 都要初始化
func (c *NewsSender) StartWorker(ctx context.Context, id int) {
defer c.wg.Done()
fmt.Printf("create worker %d\n", id)
time.Sleep(time.Duration(3 * time.Second)) //模拟与某service建立socket的时间
for {
select {
case job, ok := <-c.jobsChan:
if !ok {
fmt.Printf("close worker %d\n", id)
return
}
time.Sleep(time.Duration(3 * time.Second)) //模拟推播运行的时间
fmt.Printf("<<worker %d finish %s>>\n", id, job)
c.resultsChan <- SendInfo{job, time.Now()}
case <-ctx.Done():
fmt.Printf("close worker %d\n", id)
return
}
}
}
func (c *NewsSender) CreateWorkerPool(ctx context.Context, poolSize int) {
c.wg = &sync.WaitGroup{}
c.wg.Add(poolSize)
for w := 0; w < poolSize; w++ {
go c.StartWorker(ctx, w)
}
}
func main() {
// ...
newsSender.CreateWorkerPool(ctx, 3)
// ..
}
最後新闻经newsSender.jobsChan{}
到 woker pool 再到newsSender.resultsChan{}
即形成了 Fan-Out Fan-In Pattern
newsSender.resultsChan{}
会在交给newsCompleteLogger.StartLog()
会不断把newsSender.resultsChan{}
送出的新闻时间记录下来
func (n *NewsCompleteLogger) StartLog(ctx context.Context) {
for result := range n.completeNews {
fmt.Printf("<<fan in news>> news %s is sent at %s\n", result.NewsName, result.FinishTime)
}
close(n.done)
}
大家会发现 code 中所有的 function 都有传入 context,即是需要用context.WithCancel()
的方式,对底下所有的 goroutine 利用ctx.Done()
做关闭。
透过signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
来绑定ctrl+c
的讯号,并等待触发。
在触发ctrl+c
後,呼叫cancel()
後,各个 goroutine 的ctx.Done()
会开始执行关闭程序,必须要有方法等待至「所有 goroutine 已经关闭」,newsSender{}
与newsCompleteLogger{}
都有采用StopWait()
实作
// ...
func (c *NewsSender) StopWait(ctx context.Context) {
c.wg.Wait()
close(c.resultsChan)
}
// ...
func (n *NewsCompleteLogger) StopWait(ctx context.Context) {
<-n.done
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
// ...
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
fmt.Println("================\nctrl+c event\n================")
cancel()
newsSender.StopWait(ctx)
newsCompleteLogger.StopWait(ctx)
}
newsSender.StopWait()
:里头c.wg.Wait()
会等待所有 goroutine 的c.wg.Done()
运行,c.wg.Done()
是由ctx.Done()
触发。c.wg.Wait()
等待完毕後,关闭newsSender.resultsChan{}
做善後动作newsCompleteLogger.StopWait()
:里头newsCompleteLogger.done{}
会等待唯一一个 goroutine 运行完毕,即newsCompleteLogger.StartLog()
将newsSender.resultsChan{}
都读完的时候会对newsCompleteLogger.done{}
做 close.StopWait()
都执行完毕後,即所有 goroutine 对剩余的讯息都消费完毕,即可安全的关闭主程序main()
。
扩充挂载 昨日有讲解PV & PVC Static Provisioning的做法,今日会介...
并查集是一种树状的结构,可以用来表示两个节点的连接、查询两个节点的连接~~ 在刷题的时候有时候会使用...
请不要在意我标题出生日期一直跳,育儿的日子没那麽多废文可以写XD~而且中间很多天在干嘛其实也忘了囧...
「每一个范式都将某些东西带离开我们。goto语句、函式指标、赋值,还有什麽东西可以带走的吗?」 「...
本系列文之後也会置於个人网站 +----------+ | Resource | | Owner ...