day 22 - NSQ Consumer & graceful shutdown

一个服务发出讯息之後, 可以由多个服务分别注册多个channel来监听, 同一个TOPIC底下的每个channel都会拿到一样的讯息。

当後端的沟通都是透过NSQ讯号的时候, 流程上每个环节的每个动作都可以拆分出来, 每个Consumer都可以接收到第一手的资料, 不需要再经过其他服务的传递。 这样可以减少资料传递所消耗的时间、避免资料内容缺失、需要调整资料内容时也不用再大家都改, 只要源头修改好了, 大家都会拿到一样的资讯。

但Consumer收到讯息之後可能会做很多复杂的处理, 所以Consumer也需要graceful shutdown。
以下程序大概会是这样:

  • 建立Consumer

        // 建立空白设定档。
        ConsumerConfig := nsq.NewConfig()
        // 设置重连时间
        ConsumerConfig.LookupdPollInterval = time.Second * 2
        consumer, _ := nsq.NewConsumer("COCONUT_UPDATE_POINT", "coconut", ConsumerConfig)
        consumer.AddConcurrentHandlers(TestNSQConsumer(), config.NsqConsumerWorkers)
        err = consumer.ConnectToNSQLookupd(config.NsqLookupdAddr)
        if err != nil {
            return
        }
    
    

    AddConcurrentHandlers 是设定要开几个worker处理, 如果会同时收大批讯息就要测试worker设定的合理值才能在时间内收下所有资料。
    讯息进来後由TestNSQConsumer()负责收下来处理, message.DisableAutoResponse() & defer message.Finish()是不要再让讯息Requeue回去, 这样可能会发生处理到一半的状态, 这边要看使用的情境, 如果是需要讯息重发的可以使用Requeue, 可以参考官方的test case

    func TestNSQConsumer() nsq.Handler {
       return nsq.HandlerFunc(func(message *nsq.Message) (err error) {
           message.DisableAutoResponse()
           defer message.Finish()
           log.Printf("========================== 收到的讯息是:%v", string(message.Body))
           time.Sleep(time.Duration(20) * time.Second)
           log.Printf("========= 故意睡 20s 测试是否有等consumer处理完才结束")
           return nil
       })
    }
    
  • NewConsumer完成之後, 宣告stopOnSignalExitNCL 把Consumer收集起来, 透过SnowFlake演算法给他一个不重复的ID

        var (
            wg                  sync.WaitGroup
            stopOnSignalExitNCL = NsqConsumerList{} // 收集停止讯号时停止nsq consumer 的列表
        )
    
        stopOnSignalExitNCL.Set(snowflakeNode.Generate().Int64(), consumer)
        // 停止部分Nsq Consumer避免有讯息进来
        Logger.Debugf("停止部分Nsq Consumer避免有讯息进来...")
    
        err = stopOnSignalExitNCL.Each(func(c *nsq.Consumer) error {
            wg.Add(1)
            go func(c *nsq.Consumer, wg *sync.WaitGroup) {
                // 停止讯号会等待正在处理的讯息做完才结束
                c.Stop()
                <-c.StopChan
                wg.Done()
            }(c, &wg)
    
            return nil
        })
    
        if err != nil {
            Logger.Errorf("stopOnSignalExitNCL.Each err: %s", err.Error())
        }
    
        wg.Wait()
        Logger.Debugf("停止 NSQ Consumer完成...")
    
    
  • NsqConsumerList 结构, 丢一个callback到consumer, 等到所有的worker都wg.Done()才能结束

        type NsqConsumerList struct {
            sync.Map
        }
    
        // Set 存Consumer
        func (ncl *NsqConsumerList) Set(key int64, c *nsq.Consumer) *NsqConsumerList {
            ncl.Store(key, c)
            return ncl
        }
    
        // Each callback每个Consumer
        func (ncl *NsqConsumerList) Each(callback func(c *nsq.Consumer) error) error {
            ncl.Range(func(k, v interface{}) bool {
                c, ok := v.(*nsq.Consumer)
    
                if !ok {
                    return false
                }
    
                err := callback(c)
    
                if err != nil {
                    return false
                }
    
                return true
            })
    
            return nil
        }
    
    

NSQ本身对於收到的讯息并没有备份的机制, 如果需要额外纪录下NSQ讯号的历程资讯, 可以开一个channel在TOPIC底下监听, 再看要储存在哪个资料库。 资料收集之後,假设有一天发生不可预期的严重资料遗失问题,至少还有备份可以回放时间历程。

备份很重要,重要的资料一定要备份。


<<:  【Day15】电子商务与数位行销篇-网站

>>:  D22 - Grafana Monitor

学习JavaScript纪录Day1

学习目的: JavaScrip语言是网站客服端的最佳动态利器,想要制作一个生动的网页肯定不能少了他。...

[Day21] JavaScript - Event object (事件物件)

当任何的事件发生时,浏览器就会为这个事件创造一个物件,我们称为事件物件(Event object)...

建立你想要的文化(3)- 落地

将价值观转化为具体行为 有了明确的价值之後,下一步就是为每一个价值举出具体的行为。这是因为不管你陈述...

Day02 测试写起乃 - Rails 测试推荐资源

在开始之前我想先推荐几个来源对於写 Rails 测试颇有帮助的几个网站也是我个人在开始写之前有看过的...

数字谎言

前言 在这个蔬菜是有机的、水果都会甜、衣服耐洗不缩水、满街百年创始老店、车子很省油、房…的社会当中,...