一个服务发出讯息之後, 可以由多个服务分别注册多个channel来监听, 同一个TOPIC底下的每个channel都会拿到一样的讯息。
当後端的沟通都是透过NSQ讯号的时候, 流程上每个环节的每个动作都可以拆分出来, 每个Consumer都可以接收到第一手的资料, 不需要再经过其他服务的传递。 这样可以减少资料传递所消耗的时间、避免资料内容缺失、需要调整资料内容时也不用再大家都改, 只要源头修改好了, 大家都会拿到一样的资讯。
但Consumer收到讯息之後可能会做很多复杂的处理, 所以Consumer也需要graceful shutdown。
以下程序大概会是这样:
建立Consumer
// 建立空白设定档。
ConsumerConfig := nsq.NewConfig()
// 设置重连时间
ConsumerConfig.LookupdPollInterval = time.Second * 2
consumer, _ := nsq.NewConsumer("COCONUT_UPDATE_POINT", "coconut", ConsumerConfig)
consumer.AddConcurrentHandlers(TestNSQConsumer(), config.NsqConsumerWorkers)
err = consumer.ConnectToNSQLookupd(config.NsqLookupdAddr)
if err != nil {
return
}
AddConcurrentHandlers
是设定要开几个worker处理, 如果会同时收大批讯息就要测试worker设定的合理值才能在时间内收下所有资料。
讯息进来後由TestNSQConsumer()
负责收下来处理, message.DisableAutoResponse()
& defer message.Finish()
是不要再让讯息Requeue回去, 这样可能会发生处理到一半的状态, 这边要看使用的情境, 如果是需要讯息重发的可以使用Requeue
, 可以参考官方的test case。
func TestNSQConsumer() nsq.Handler {
return nsq.HandlerFunc(func(message *nsq.Message) (err error) {
message.DisableAutoResponse()
defer message.Finish()
log.Printf("========================== 收到的讯息是:%v", string(message.Body))
time.Sleep(time.Duration(20) * time.Second)
log.Printf("========= 故意睡 20s 测试是否有等consumer处理完才结束")
return nil
})
}
NewConsumer完成之後, 宣告stopOnSignalExitNCL
把Consumer收集起来, 透过SnowFlake演算法给他一个不重复的ID
var (
wg sync.WaitGroup
stopOnSignalExitNCL = NsqConsumerList{} // 收集停止讯号时停止nsq consumer 的列表
)
stopOnSignalExitNCL.Set(snowflakeNode.Generate().Int64(), consumer)
// 停止部分Nsq Consumer避免有讯息进来
Logger.Debugf("停止部分Nsq Consumer避免有讯息进来...")
err = stopOnSignalExitNCL.Each(func(c *nsq.Consumer) error {
wg.Add(1)
go func(c *nsq.Consumer, wg *sync.WaitGroup) {
// 停止讯号会等待正在处理的讯息做完才结束
c.Stop()
<-c.StopChan
wg.Done()
}(c, &wg)
return nil
})
if err != nil {
Logger.Errorf("stopOnSignalExitNCL.Each err: %s", err.Error())
}
wg.Wait()
Logger.Debugf("停止 NSQ Consumer完成...")
NsqConsumerList 结构, 丢一个callback到consumer, 等到所有的worker都wg.Done()
才能结束
type NsqConsumerList struct {
sync.Map
}
// Set 存Consumer
func (ncl *NsqConsumerList) Set(key int64, c *nsq.Consumer) *NsqConsumerList {
ncl.Store(key, c)
return ncl
}
// Each callback每个Consumer
func (ncl *NsqConsumerList) Each(callback func(c *nsq.Consumer) error) error {
ncl.Range(func(k, v interface{}) bool {
c, ok := v.(*nsq.Consumer)
if !ok {
return false
}
err := callback(c)
if err != nil {
return false
}
return true
})
return nil
}
NSQ本身对於收到的讯息并没有备份的机制, 如果需要额外纪录下NSQ讯号的历程资讯, 可以开一个channel在TOPIC底下监听, 再看要储存在哪个资料库。 资料收集之後,假设有一天发生不可预期的严重资料遗失问题,至少还有备份可以回放时间历程。
备份很重要,重要的资料一定要备份。
学习目的: JavaScrip语言是网站客服端的最佳动态利器,想要制作一个生动的网页肯定不能少了他。...
当任何的事件发生时,浏览器就会为这个事件创造一个物件,我们称为事件物件(Event object)...
将价值观转化为具体行为 有了明确的价值之後,下一步就是为每一个价值举出具体的行为。这是因为不管你陈述...
在开始之前我想先推荐几个来源对於写 Rails 测试颇有帮助的几个网站也是我个人在开始写之前有看过的...
前言 在这个蔬菜是有机的、水果都会甜、衣服耐洗不缩水、满街百年创始老店、车子很省油、房…的社会当中,...