DAY 12:Concurrency Patterns 融会贯通+Graceful Shutdown,正确关闭各个宇宙的次元门

前 11 天已经将常见的 concurrency patterns 介绍完毕,今天我们要介绍的不是 patterns,而是在实务使用 concurrency patterns 要特别注意的事项「Graceful Shutdown」。

在实务的系统设计中,会将多个 pattern 融合以应付复杂的需求,本篇程序码很长,如果范例过於复杂,建议可以先看 DAY 7 ~ DAY 11 的文章,先以较小的知识点来理解 pattern,再进入多个 pattern 结合的本文章。

什麽是 Graceful Shutdown?

安全的关闭正在运行的 goroutine,即 goroutine 当前的任务运行完毕再关闭

DAY 9:Worker Pool Pattern,就。很。Pool。,或DAY 7:Fan-Out Fan-In Pattern,看吧世界!这就是多人解决的力量!的 pattern 中,如果在运行时直接使用 ctrl+c 关闭程序,正在运行的 goroutine 会直接关闭,导致正在被消费的 job 中断,这样会造成程序不安全。

举例来说,如果有系统正在对 DB 进行 transaction,但 server 直接 shutdown 了,那 transaction 就不会 rollback,整个 DB 就会停在 transaction 的状态,造成系统不稳定,我们期望应该是

server 收到 shutdown 的请求後,停止接收新的 requests,将现有的 requests 处理完毕後 shutdown

问题情境

延续推播新闻系统的情境,将新的新闻直接推播出去,除了推播系统效率要高,还需纪录推播完成的时间。

另外还加上以下各种情境,这都需要前面的 DAY 1 ~ DAY 11 的 pattern 观念,我将其中比较核心的 pattern 也写出来:

解决方式

整体的运行图:

相关的 code 在Github - go-design-patterns。整体程序码:

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

type SendInfo struct {
	NewsName   string
	FinishTime time.Time
}

type NewsSender struct {
	inputsChan  chan string
	jobsChan    chan string
	resultsChan chan SendInfo
	wg          *sync.WaitGroup
}

type NewsCompleteLogger struct {
	completeNews <-chan SendInfo
	done         chan bool
}

func (c *NewsSender) StartConsume(ctx context.Context) {
	for {
		select {
		case in := <-c.inputsChan:
			if ctx.Err() != nil {
				close(c.jobsChan)
				return
			}
			c.jobsChan <- in
		case <-ctx.Done():
			close(c.jobsChan)
			return
		}
	}
}

func (c *NewsSender) StartWorker(ctx context.Context, id int) {
	defer c.wg.Done()
	fmt.Printf("create worker %d\n", id)
	time.Sleep(time.Duration(3 * time.Second)) //模拟与某service建立socket的时间
	for {
		select {
		case job, ok := <-c.jobsChan:
			if !ok {
				fmt.Printf("close worker %d\n", id)
				return
			}
			time.Sleep(time.Duration(3 * time.Second)) //模拟推播运行的时间
			fmt.Printf("<<worker %d finish %s>>\n", id, job)
			c.resultsChan <- SendInfo{job, time.Now()}
		case <-ctx.Done():
			fmt.Printf("close worker %d\n", id)
			return
		}
	}
}

func (c *NewsSender) CreateWorkerPool(ctx context.Context, poolSize int) {
	c.wg = &sync.WaitGroup{}
	c.wg.Add(poolSize)
	for w := 0; w < poolSize; w++ {
		go c.StartWorker(ctx, w)
	}
}

func (c *NewsSender) StopWait(ctx context.Context) {
	c.wg.Wait()
	close(c.resultsChan)
}

func CreateNewsSender(ctx context.Context) *NewsSender {
	newsSender := NewsSender{
		inputsChan:  make(chan string),
		jobsChan:    make(chan string),
		resultsChan: make(chan SendInfo),
	}
	return &newsSender
}

func ProduceToNewsSender(allNews []string, inputsChan chan<- string) {
	for _, news := range allNews {
		fmt.Printf("<<producer send %s>>\n", news)
		inputsChan <- news
	}
}

func CreateNewsCompleteLogger(ctx context.Context, completeNews <-chan SendInfo) *NewsCompleteLogger {
	newsCompleteLogger := NewsCompleteLogger{
		completeNews: completeNews,
		done:         make(chan bool),
	}
	return &newsCompleteLogger
}

func (n *NewsCompleteLogger) StartLog(ctx context.Context) {
	for result := range n.completeNews {
		fmt.Printf("<<fan in news>> news %s is sent at %s\n", result.NewsName, result.FinishTime)
	}
	close(n.done)
}

func (n *NewsCompleteLogger) StopWait(ctx context.Context) {
	<-n.done
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	newsSender := CreateNewsSender(ctx)
	newsCompleteLogger := CreateNewsCompleteLogger(ctx, newsSender.resultsChan)
	newsSender.CreateWorkerPool(ctx, 3)

	go ProduceToNewsSender([]string{
		"中秋节来了",
		"记得",
		"不要户外烤肉~",
		"也不要吃太撑",
	}, newsSender.inputsChan)
	go ProduceToNewsSender([]string{
		"床前明月光",
		"疑似地上霜",
		"举头望明月",
		"低头思故乡",
	}, newsSender.inputsChan)
	go ProduceToNewsSender([]string{
		"渭城朝雨邑轻尘",
		"客舍青青柳色新",
		"劝君更尽一杯酒",
		"西出阳关无故人",
	}, newsSender.inputsChan)

	go newsSender.StartConsume(ctx)
	go newsCompleteLogger.StartLog(ctx)

	termChan := make(chan os.Signal)
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
	<-termChan

	fmt.Println("================\nctrl+c event\n================")
	cancel()
	newsSender.StopWait(ctx)
	newsCompleteLogger.StopWait(ctx)
}

Concurrency Patterns 融会贯通

首先,整个系统分别有两个 struct NewsSender{}NewsCompleteLogger{}

type NewsSender struct {
	inputsChan  chan string
	jobsChan    chan string
	resultsChan chan SendInfo
	wg          *sync.WaitGroup
}

type NewsCompleteLogger struct {
	completeNews <-chan SendInfo
	done         chan bool
}
  • NewsSender{}拥有
    • inputsChan{} 与 producer goroutine 沟通
    • jobsChan{} 将收到的任务朝 worker 发送
    • resultsChan{} 将 worker 处理完毕的新闻资讯与NewsCompleteLogger{}沟通
    • wg{}用来等待所有 goroutine 处理结束
  • NewsCompleteLogger{}拥有
    • completeNews{}接收resultsChan{}的新闻资讯
    • done等待completeNews{}消费完毕後,发送讯号的 channel

NewsSender{}会实作 Producer Consumer Pattern、Worker Pool Pattern、Fan-Out Fan-In Pattern

将多个新闻输入为 Producer Consumer Pattern,透过ProduceToNewsSender()送至newsSender.inputsChan{}

func ProduceToNewsSender(allNews []string, inputsChan chan<- string) {
	for _, news := range allNews {
		fmt.Printf("<<producer send %s>>\n", news)
		inputsChan <- news
	}
}

go ProduceToNewsSender([]string{
	"中秋节来了",
	"记得",
	"不要户外烤肉~",
	"也不要吃太撑",
}, newsSender.inputsChan)
go ProduceToNewsSender([]string{
	"床前明月光",
	"疑似地上霜",
	"举头望明月",
	"低头思故乡",
}, newsSender.inputsChan)
go ProduceToNewsSender([]string{
	"渭城朝雨邑轻尘",
	"客舍青青柳色新",
	"劝君更尽一杯酒",
	"西出阳关无故人",
}, newsSender.inputsChan)

NewsSender.StartConsume()是一个 consumer 来消费,这是因为newsSender.inputsChan{}会再将接收到的新闻传送至newsSender.jobsChan{}供 worker pool 处理。

func (c *NewsSender) StartConsume(ctx context.Context) {
	for {
		select {
		case in := <-c.inputsChan:
			if ctx.Err() != nil {
				close(c.jobsChan)
				return
			}
			c.jobsChan <- in
		case <-ctx.Done():
			close(c.jobsChan)
			return
		}
	}
}

这些 woker 会预先启动,即 Worker Pool Pattern 的观念,已避免每次使用 goroutine 都要初始化

func (c *NewsSender) StartWorker(ctx context.Context, id int) {
	defer c.wg.Done()
	fmt.Printf("create worker %d\n", id)
	time.Sleep(time.Duration(3 * time.Second)) //模拟与某service建立socket的时间
	for {
		select {
		case job, ok := <-c.jobsChan:
			if !ok {
				fmt.Printf("close worker %d\n", id)
				return
			}
			time.Sleep(time.Duration(3 * time.Second)) //模拟推播运行的时间
			fmt.Printf("<<worker %d finish %s>>\n", id, job)
			c.resultsChan <- SendInfo{job, time.Now()}
		case <-ctx.Done():
			fmt.Printf("close worker %d\n", id)
			return
		}
	}
}

func (c *NewsSender) CreateWorkerPool(ctx context.Context, poolSize int) {
	c.wg = &sync.WaitGroup{}
	c.wg.Add(poolSize)
	for w := 0; w < poolSize; w++ {
		go c.StartWorker(ctx, w)
	}
}

func main() {
	// ...
	newsSender.CreateWorkerPool(ctx, 3)
	// ..
}

最後新闻经newsSender.jobsChan{}到 woker pool 再到newsSender.resultsChan{}即形成了 Fan-Out Fan-In Pattern

newsSender.resultsChan{}会在交给newsCompleteLogger.StartLog()会不断把newsSender.resultsChan{}送出的新闻时间记录下来

func (n *NewsCompleteLogger) StartLog(ctx context.Context) {
	for result := range n.completeNews {
		fmt.Printf("<<fan in news>> news %s is sent at %s\n", result.NewsName, result.FinishTime)
	}
	close(n.done)
}

Graceful Shutdown

大家会发现 code 中所有的 function 都有传入 context,即是需要用context.WithCancel()的方式,对底下所有的 goroutine 利用ctx.Done()做关闭。

透过signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)来绑定ctrl+c的讯号,并等待触发。

在触发ctrl+c後,呼叫cancel()後,各个 goroutine 的ctx.Done()会开始执行关闭程序,必须要有方法等待至「所有 goroutine 已经关闭」,newsSender{}newsCompleteLogger{}都有采用StopWait()实作


// ...

func (c *NewsSender) StopWait(ctx context.Context) {
	c.wg.Wait()
	close(c.resultsChan)
}

// ...

func (n *NewsCompleteLogger) StopWait(ctx context.Context) {
	<-n.done
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	// ...

	termChan := make(chan os.Signal)
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
	<-termChan

	fmt.Println("================\nctrl+c event\n================")
	cancel()
	newsSender.StopWait(ctx)
	newsCompleteLogger.StopWait(ctx)
}
  • newsSender.StopWait():里头c.wg.Wait()会等待所有 goroutine 的c.wg.Done()运行,c.wg.Done()是由ctx.Done()触发。c.wg.Wait()等待完毕後,关闭newsSender.resultsChan{}做善後动作
  • newsCompleteLogger.StopWait():里头newsCompleteLogger.done{}会等待唯一一个 goroutine 运行完毕,即newsCompleteLogger.StartLog()newsSender.resultsChan{}都读完的时候会对newsCompleteLogger.done{}做 close

.StopWait()都执行完毕後,即所有 goroutine 对剩余的讯息都消费完毕,即可安全的关闭主程序main()


<<:  [Day17] Webpack - 跨浏览器支援

>>:  JS 10 - 原型继承

D槽空间还够吗?扩充一下可以下载更多东西

扩充挂载 昨日有讲解PV & PVC Static Provisioning的做法,今日会介...

DAY16 - 并查集

并查集是一种树状的结构,可以用来表示两个节点的连接、查询两个节点的连接~~ 在刷题的时候有时候会使用...

出生第48天 铁人完赛日

请不要在意我标题出生日期一直跳,育儿的日子没那麽多废文可以写XD~而且中间很多天在干嘛其实也忘了囧...

Day 15: 范式概述、结构化设计 (待改进中... )

「每一个范式都将某些东西带离开我们。goto语句、函式指标、赋值,还有什麽东西可以带走的吗?」 「...

Day14 - 【概念篇】OAuth flows: Implicit (Legacy)

本系列文之後也会置於个人网站 +----------+ | Resource | | Owner ...