day 21 - NSQ Producer

Producer是讯息发送方, 他会对nsqd发送讯息, nsqd支援TCP(port:4150) & HTTP(port:4151),
本机启动nsq环境时要记得把相关address指向本机

  • nsqd
    ./nsqd --broadcast-address=127.0.0.1 --lookupd-tcp-address=127.0.0.1:4160
    
  • nsqlookupd
    ./nsqlookupd
    
  • nsqadmin
    ./nsqadmin --lookupd-http-address=127.0.0.1:4161
    

环境跑起来之後可以透过curl测试创建topic&channel, 详细支援内容可以参考官网

  • 使用NSQ时, nsq Config 参数设定可参考配置参数, 这边示范调整max_in_flight 加大允许处理中的消息数

    func newNSQProducer(addr string) (r *nsq.Producer, err error) {
        NSQconfig := nsq.NewConfig()
        err = NSQconfig.Set("max_in_flight", config.NsqdMaxInFlight)
        if err != nil {
            return
        }
    
        r, err = nsq.NewProducer(addr, NSQconfig)
        if err != nil {
            return
        }
    
        err = r.Ping()
        if err != nil {
            return
        }
    
        return r, err
    }
    
  • 接着在 cmd.go 加入 nsqProducer, 并放在server结构里面, 方便调用

        // nsq producer
        nsqProducer, err := newNSQProducer(config.NsqdAddr)
        if err != nil {
            return
        }
    
        defer func() {
            nsqProducer.Stop()
        }()
    
        sv := &server{
            ScyllaSession: session,
            RedisClient:   redisClient,
            NsqProducer:   nsqProducer,
        }
    
  • NSQ发送讯息只需要producer.Publish到指定的TOPIC(COCONUT_UPDATE_POINT)就完成了。
    TOPIC由负责发送的服务定义好之後就很少会去修改, 所以後来我们就不设定为参数了;
    发送讯息的内容会采用JSON格式, 方便接收到资料的人转出结构,
    如果是要判断接收讯息先後顺序的资料就需要加上发送时间。

        err = s.NsqProducer.Publish("COCONUT_UPDATE_POINT", []byte(str))
        if err != nil {
            return nil, coconutError.ParseError(coconutError.ErrServer, err)
        }
    
  • 起一个最精简的consumer就可以测试收到的讯息效果

    COCONUT_UPDATE_POINT 收到的讯息是:[{"name":"A","points":100},{"name":"B","points":100},{"name":"C","points":100}]
    

Producer发送讯息之後, 如果没有Consumer接走, 就会堆在queue里面
此时可以透过nsqadmin查看nsq讯息状况

  • 查看TOPIC列表
    https://i.imgur.com/FzR7H4Z.png
  • 查看每个TOPIC底下的监听channel & 讯息状况
    https://i.imgur.com/RQqSaap.png
  • 讯息如果一直堆着没有被接走, 长期下来会造成记忆体或硬碟空间不足, 可以在channel後面加上#ephemeral, 让channel断线之後就自动消失, 这样可以避免讯息累积。
    https://i.imgur.com/4HhZV36.png

参考资料


<<:  [ 卡卡 DAY 14 ] - React Native 页面导览 Navigation (中)

>>:  Day27-机器学习(1) SVM

网路功能虚拟化(NFV)、软件定义网路(SDN)、软件定义边界 (SDP) 和零信任(Zero Trust)

网路功能虚拟化 (NFV) 通常使用专有服务器来运行网路服务以提高性能。 根据Wikipedia的说...

NIST SDLC和RMF(续)

根据FIPS 199,“确定信息系统的安全类别需要进行更多的分析,并且必须考虑驻留在信息系统上的所有...

30天打造品牌特色电商网站 Day.2 电商网站比较分析

认识过网站的基础後,接下来可以多观察生活中的范例做练习,选择几个成效不错的网站去做比较与分析,以下整...

Day17. Blue Prism的匿踪行动-BP Page Stage 重新使用发布的流程页

我们常在说如何节省时间减少重工, Reuse就是一种好方法。 我们今天写的程序,明後天都会写到同一段...

[DAY-12] 除去大部分控制 充分资讯 放心授权

加以控管 vs. 给予资讯 控管式 老板核准及指挥所有提案 行动和团队决策 有时候会直接监督 来控制...