Loading... @[TOC](nsq高性能消息队列) # 前言 > tips:如果本文对你有用,请爱心点个赞,提高排名,让这篇文章帮助更多的人。谢谢大家!比心❤~ > 如果解决不了,可以在文末加我微信,进群交流。 NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。 NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。 NSQ 非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用 Go 和 Python 库。如果读者有兴趣构建自己的客户端的话,还可以参考官方提供的协议规范。 网上有人翻译了国外的一篇文章:[我们是如何使用NSQ处理7500亿消息的](http://www.jfh.com/jfperiodical/article/1949?) ![在这里插入图片描述](https://img-blog.csdnimg.cn/20200506141018577.gif) ## 安装和部署 官网文档:[https://nsq.io/overview/quick_start.html](https://nsq.io/overview/quick_start.html) 中文文档:[http://wiki.jikexueyuan.com/project/nsq-guide/](http://wiki.jikexueyuan.com/project/nsq-guide/) 我是在ubuntu系统中按照官方操作进行部署测试。 1. **安装nsq启动服务** 在[https://nsq.io/deployment/installing.html](https://nsq.io/deployment/installing.html)选择对应的版本,并解压。 ```bash $ tar -zxvf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz $ cd nsq-1.2.0.linux-amd64.go1.12.9/bin $ sudo cp ~/Downloads/nsq-1.2.0.linux-amd64.go1.12.9/bin/ -r /usr/local/nsq/bin $ sudo vim /etc/profile $ source ``` 2. **后台启动三个服务** ```bash $ ./nsqlookupd > /dev/null 2>&1 & [1] 20076 $ ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 & [2] 20420 $ ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 & [3] 20620 ``` > `-lookupd-tcp-address` 为上面nsqlookupd的IP和tcp的端口4160 > `-lookupd-http-address` 是http的端口也就是4161因为admin通过http请求来查询相关信息 3. **基本概念** nsqd:基本的节点 nsqlookupd:汇总节点信息,提供查询和管理topic等服务 nsqadmin:管理端展示UI界面,能有一个web页面去查看和操作 4. **简单使用** * 执行:`curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'`会创建一个test主题,并发送一个hello world消息 * 外部通过:`http://127.0.0.1:4171/`进行访问可以看到NSQ的管理界面,非常的简洁,其中`127.0.0.1`为服务器IP ![在这里插入图片描述](https://img-blog.csdnimg.cn/20200506140249796.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjY2MTMyMQ==,size_16,color_FFFFFF,t_70) ![在这里插入图片描述](https://img-blog.csdnimg.cn/20200506140503119.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjY2MTMyMQ==,size_16,color_FFFFFF,t_70) * 使用`./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161`消费test中刚才的消息,并输出到服务器/tmp目录中 ## 特性 默认一开始消息不是持久化的 nsq采用的方式时内存+硬盘的模式,当内存到达一定程度时就会将数据持久化到硬盘 1. 如果将 `--mem-queue-size` 设置为 `0`,所有的消息将会存储到磁盘。 2. 是即使服务器重启也会将当时在内存中的消息持久化 3. 消息是没有顺序的 这一点很关键,由于nsq使用内存+磁盘的模式,而且还有requeue的操作,所以发送消息的顺序和接收的顺序可能不一样 4. 官方不推荐使用客户端发消息 官方提供相应的客户端发送消息,但是HTTP可能更方便一些 5. 没有复制 nsq节点相对独立,节点与节点之间没有复制或者集群的关系。 6. 没有鉴权相关模块 当前release版本的nsq没有鉴权模块,只有版本v0.2.29+高于这个的才有 7. 几个小点 topic名称有长度限制,命名建议用下划线连接 消息体大小有限制 ## nsq优点&缺点 优点: 1. 部署极其方便,没有任何环境依赖,直接启动就行 2. 轻量没有过多的配置参数,只需要简单的配置就可以直接使用 3. 性能高 4. 消息不存在丢失的情况 缺点: 1. 消息无顺序 2. 节点之间没有消息复制 3. 没有鉴权 ## 客户端 官方提供了很多语言接入的客户端 [https://nsq.io/clients/client_libraries.html](https://nsq.io/clients/client_libraries.html) 针对消息生产者的客户端,官方还推荐直接使用post请求发送消息,如: `curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'` 表示向test主题发送hello world这个消息 ## Golang的客户端 deb安装 ```bash $ curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh ``` ![在这里插入图片描述](https://img-blog.csdnimg.cn/20200506153703415.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjY2MTMyMQ==,size_16,color_FFFFFF,t_70) 依赖包下载: ```bash $ go get github.com/nsqio/go-nsq ``` 生产者: ```go package main // 生产者 import ( "fmt" "github.com/nsqio/go-nsq" ) var tcpNsqdAddr = "127.0.0.1:4150" func main() { // 初始化配置 config := nsq.NewConfig() for i := 0; i < 100; i++ { // 创建100个生产者 tPro, err := nsq.NewProducer(tcpNsqdAddr, config) if err != nil { fmt.Printf("tPro new failed:%s", err) } // 主题 topic := "Insert" // 主题内容 tCommand := "New data!" // 发布消息 err = tPro.Publish(topic, []byte(tCommand)) if err != nil { fmt.Printf("Publish failed:%s", err) } } } ``` > 其中`127.0.0.1:4150`为发送消息的地址,消费者里面写的也是相同的地址就可以了。 消费者: ```go package main // 消费者 import ( "fmt" "sync" "time" "github.com/nsqio/go-nsq" ) var tcpNsqdAddr = "127.0.0.1:4150" type NsqHandler struct { // 消息数 msqCount int // 标识id nsqHandlerID string } func main() { // 初始化配置 config := nsq.NewConfig() // 创造消费者,参数一是订阅的主题,参数二是使用的通道 com, err := nsq.NewConsumer("Insert", "channel1", config) if err != nil { fmt.Println(err) } // 添加处理回调 com.AddHandler(&NsqHandler{nsqHandlerID: "One"}) // 连接对应的nsqd err = com.ConnectToNSQD(tcpNsqdAddr) if err != nil { fmt.Println(err) } // 只是为了不结束进程,这里没有意义 var wg = &sync.WaitGroup{} wg.Add(1) wg.Wait() } // HandleMessage 实现HandleMessage方法 // message是接收到的消息 func (s *NsqHandler) HandleMessage(message *nsq.Message) error { // 每接收到一条消息]+1 s.msqCount ++ // 打印输出信息和ID fmt.Println(s.msqCount,s.nsqHandlerID) // 打印消息的一些基本信息 fmt.Printf("msg.Timestamp=]%v,msg.nsqaddress=%s,msg.body=]%s", time.Unix(0,message.Timestamp).Format("2006-01-02 03:04:05"),message.NSQDAddress,string(message.Body)) return nil } ``` > 有问题请添加个人微信:【mengyilingjian】,进群一起技术讨论。添加时请备注来意,谢谢! 最后修改:2021 年 03 月 16 日 © 允许规范转载 打赏 赞赏作者 支付宝微信 赞 0 感谢赏赐的coffee~
1 条评论
博主真是太厉害了!!!