Producer是讯息发送方, 他会对nsqd发送讯息, nsqd支援TCP(port:4150) & HTTP(port:4151),
本机启动nsq环境时要记得把相关address指向本机
./nsqd --broadcast-address=127.0.0.1 --lookupd-tcp-address=127.0.0.1:4160
./nsqlookupd
./nsqadmin --lookupd-http-address=127.0.0.1:4161
环境跑起来之後可以透过curl测试创建topic&channel, 详细支援内容可以参考官网
使用NSQ时, nsq Config 参数设定可参考配置参数, 这边示范调整max_in_flight
加大允许处理中的消息数
func newNSQProducer(addr string) (r *nsq.Producer, err error) {
NSQconfig := nsq.NewConfig()
err = NSQconfig.Set("max_in_flight", config.NsqdMaxInFlight)
if err != nil {
return
}
r, err = nsq.NewProducer(addr, NSQconfig)
if err != nil {
return
}
err = r.Ping()
if err != nil {
return
}
return r, err
}
接着在 cmd.go 加入 nsqProducer, 并放在server结构里面, 方便调用
// nsq producer
nsqProducer, err := newNSQProducer(config.NsqdAddr)
if err != nil {
return
}
defer func() {
nsqProducer.Stop()
}()
sv := &server{
ScyllaSession: session,
RedisClient: redisClient,
NsqProducer: nsqProducer,
}
NSQ发送讯息只需要producer.Publish
到指定的TOPIC(COCONUT_UPDATE_POINT)就完成了。
TOPIC由负责发送的服务定义好之後就很少会去修改, 所以後来我们就不设定为参数了;
发送讯息的内容会采用JSON格式, 方便接收到资料的人转出结构,
如果是要判断接收讯息先後顺序的资料就需要加上发送时间。
err = s.NsqProducer.Publish("COCONUT_UPDATE_POINT", []byte(str))
if err != nil {
return nil, coconutError.ParseError(coconutError.ErrServer, err)
}
起一个最精简的consumer就可以测试收到的讯息效果
COCONUT_UPDATE_POINT 收到的讯息是:[{"name":"A","points":100},{"name":"B","points":100},{"name":"C","points":100}]
Producer发送讯息之後, 如果没有Consumer接走, 就会堆在queue里面
此时可以透过nsqadmin查看nsq讯息状况
#ephemeral
, 让channel断线之後就自动消失, 这样可以避免讯息累积。参考资料
<<: [ 卡卡 DAY 14 ] - React Native 页面导览 Navigation (中)
网路功能虚拟化 (NFV) 通常使用专有服务器来运行网路服务以提高性能。 根据Wikipedia的说...
根据FIPS 199,“确定信息系统的安全类别需要进行更多的分析,并且必须考虑驻留在信息系统上的所有...
认识过网站的基础後,接下来可以多观察生活中的范例做练习,选择几个成效不错的网站去做比较与分析,以下整...
我们常在说如何节省时间减少重工, Reuse就是一种好方法。 我们今天写的程序,明後天都会写到同一段...
加以控管 vs. 给予资讯 控管式 老板核准及指挥所有提案 行动和团队决策 有时候会直接监督 来控制...