NSQ

一,简介

nsq原始代码进行学习 https://nercoeus.github.io

分布式消息队列 NSQ,NSQ是google开发,用go实现的消息队列服务,

NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议

两种比较常用的消息队列——NSQ和Kafka

下面上一张Nsq与其他mq的对比图

图片来自golang2017开发者大会

示意图清晰的展示了整个NSQ架构的端口关系:

生产消费过程

(1) 生产者连接nsqd,通过http接口,写入消息到topic

(2) topic把消息分发到所有的channel

(3) 消费者通过nsqdlookupd发现topic对应的nsqd,连接到nsqd上,订阅指定的channel,当有消息到达时进行消费

(4) 同一个channel下多个消费者协作消费,channel会自动做负载均衡把消息发给不同的消费者

在 news_data_consumer 这个项目中,我们的配置中指定了一个topic和一个channel,而项目本身是单线程、多机器(多进程)运行的,所以不同机器构成了不同的consumer,然后同一个channel会自动做负载均衡交给多台机器消费

消费者有两种方式与nsqd建立连接

1.消费者直连nsqd,这是最简单的方式,缺点是nsqd服务无法实现动态伸缩了(当然,自己去实现一个也是可以的)

2.消费者通过http查询nsqlookupd获取该nsqlookupd上所有nsqd的连接地址,然后再分别和这些nsqd建立连接(官方推荐的做法),但是客户端会不停的向nsqlookupd查询最新的nsqd地址目录

consumer如果通过 nsqlookupd 节点自动发现,会默认自动订阅所有包含该topic的nsqd节点;会消费所有nsqd节点的消息,消息的顺序不保证有序;如果指定连接某一个nsqd,那么只会订阅这一个nsqd,只会消费该nsqd下的消息。官方推荐的是通过 nsqlookupd 发现

生产者

生产者必须直连nsqd去投递message

这里有一个问题就是如果生产者所连接的nsqd炸了,那么message就会投递失败,所以在客户端必须自己实现相应的备用方案

生产者创建时要(1)指定ip+端口号(默认为4150),(2)publish消息的时候要指定topic

(1)单点

流程说明:

①Consumer1首先使用HTTP连接nsqlookupd的4161端口,获取Topic1相关nsqd的TCP4150端口。

②使用TCP连接到nsqd的4150端口,并生成对应的Channel1;

特别需要注意的是,当所需Topic不变的情况下,就算nsqlookupd和nsqadmin进程都杀掉,也不影响nsqd的生产和消费。

(2)集群

说明:

①当开启多个nsqd才存在集群的意义。

②尽量避免多个nsqd存在相同的Topic。如果多个nsqd真的存在相同的Topic的情况下,通过nsqlookupd将会返回所有这个Topic的IP并都能进行读取处理。(使用nsq-j进行过测试验证。)

1 消息队列的作用

  1. 解耦,将一个流程加入一层数据接口拆分成两个部分,上游专注通知,下游专注处理

  2. 缓冲,应对流量的突然上涨变更,消息队列有很好的缓冲削峰作用

  3. 异步,上游发送消息以后可以马上返回,处理工作交给下游进行

  4. 广播,让一个消息被多个下游进行处理

  5. 冗余,保存处理的消息,防止消息处理失败导致的数据丢失

2 NSQ介绍

2.1 组件

NSQ主要包含3个组件:

  • nsqd:在服务端运行的守护进程,负责接收,排队,投递消息给客户端。能够独立运行,不过通常是由 nsqlookupd 实例所在集群配置的

  • nsqlookup:为守护进程,负责管理拓扑信息并提供发现服务。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息

  • nsqadmin:一套WEB UI,用来汇集集群的实时统计,并执行不同的管理任务

2.2 特性

  1. 消息默认不可持久化,虽然系统支持消息持久化存储在磁盘中(通过 –mem-queue-size ),不过默认情况下消息都在内存中

  2. 消息最少会被投递一次,假设成立于 nsqd 节点没有错误

  3. 消息无序,是由重新队列(requeues),内存和磁盘存储的混合导致的,实际上,节点间不会共享任何信息。它是相对的简单完成疏松队列

  4. 支持无 SPOF 的分布式拓扑,nsqd 和 nsqadmin 有一个节点故障不会影响到整个系统的正常运行

  5. 支持requeue,延迟消费机制

  6. 消息push给消费者

内存限制

NSQ通过提供–mem-queue-size的配置选项来设置内存中的消息队列的大小。如果超过消息队列的大小,消息将写入硬盘。细心的读者会发现,通过设置内存消息队列的大小低到某个值时(如1或0)可以提高消息投递的可靠性。后端的硬盘队列用于非正常退出时恢复消息投递。

对于消息投递的可靠性,正常退出时消息会安全的被持久化到硬盘中,包括内存队列、投递中队列、延迟队列以及各种内部缓存。

注意,每个topic和channel的名称后面如果是以#ephemeral结尾,那么缓存中的消息不被持久化到硬盘中而且超过内存消息队列大小时,消息将被丢弃。这使消费者在订阅一个channel时可以无需消息投递可靠性。这种临时的channel在无客户端连接时会自动消失。对于一个临时的topic,意味着至少有一个channel被创建、消费和删除(通常也是临时的channel)。

消息投递的可靠性

NSQ保证消息投递至少一次,有可能重复投递。消费者应该自行保证消息处理的幂等性。

这个可靠性也作为消息协议的一部分,处理流程如下:

1.客户端声明准备好接收消息

2.NSQ发送一个消息并临时保存消息到本地

3.客户端发送FIN或者REQ表明处理成功或者失败。如果客户端未在规定的时间内作出响应,NSQ会根据设置的超时时间来自动把消息重新入队列。

这就保证了消息只有在NSQ非正常关闭时会发生丢失。这种情况下,所有内存的信息都会丢失。

如果防止消息丢失是非常重要的,即使是非正常退出,也是可以减少影响度的。一种方式是增加nsqd节点(部署在不同host),来备份消息。因为客户端处理消息是幂等的所以多次消息投递不影响下游系统,以保证任何单个节点故障不至于引起消息丢失。

关键是NSQ要提供构建的模块去支持多种生产用例和可配置程度的耐久性

消除单点故障(SPOF)

NSQ采用的是分布式的设计方式。客户端通过长连接,连接所有指定topic的所有nsqd实例,这里没有中间人,没有单点故障。

只要有足够的客户端消费者连接到所有的生产者以保障大量的消息处理,就能保证所有的信息最终都可以被处理。

对于nsqlookupd,可以通过多个实例来实现高可用。他们之间不需要之间通信,数据是会最终一致的。消费者通过配置的nsqlookupd实例来获取所有信息并将其汇总

2.3 流程

单个nsqd实例一次可以处理多个消息流。消息流被称为“topics”,一个topic有一个或者多个“channels”。每个channel会接收来自topic的所有消息。实际上,一个channel对应下游个服务消费的一个topic

topics和channels并不是预先配置的。topics是在首次发布或者订阅的时候创建的,channels是在首次订阅的时候创建的。

topics和channels的所有缓存数据都相互独立,目的是为了防止一个“慢”消费者造成消息积压而影响其他topic或channel。

一个channel通常有多个消费者连接,假如所有消费者都是在准备接收消息状态,每个消息会被随机投递到消费者中。

综述,消息是从topic广播到channel,然后从channel投递到消费者中。

NSQ还有一个辅助工具nsqlookupd,它提供了服务发现功能,使消费者能够订阅感兴趣的topic所在的所有nsqd的地址。同时在配置方面,使消费者和生产者解耦,他们不需要知道彼此,只需要通过nsqlookupd来建立联系,降低复杂性。

在底层实现中,每个nsqd和nsqlookupd都建立了长连接,定期推送自己的状态信息。nsqlookupd通过这些信息来判断哪个nsqd地址应该返回给消费者。

当添加一个新的消费者时,只要给nsqd客户端初始化nsqlookupd的实例地址。所以在新增消费者或者生产者时都无需修改任何配置,大大减少了复杂度。

需要重点强调的是,nsqd和nsqlookupd的守护进程是相互独立的,在兄弟节点之间没有通信和协作。

我们认为通过某种方式去观察、管理这些节点是非常重要的。我们构建了nsqadmin来处理这件事情,它提供了web界面来浏览topics/channels/consumers的层级、检查深度,以及其他的信息。而且还提供了一些管理员命令,比如移除和清空channel。

nsq详细介绍

单个nsqd可以有多个Topic,每个Topic又可以有多个Channel。Channel能够接收Topic所有消息的副本,从而实现了消息多播分发;而Channel上的每个消息被分发给它的订阅者,从而实现负载均衡,所有这些就组成了一个可以表示各种简单和复杂拓扑结构的强大框架。

以上摘抄于分布式消息队列-NSQ-和-Kafka-对比

核心概念

topic: topic是nsq的消息发布的逻辑关键词。当程序初次发布带topic的消息时,如果topic不存在,则会被创建。

channels: 当生产者每次发布消息的时候,消息会采用多播的方式被拷贝到各个channel中,channel起到队列的作用。

messages: 数据流的形式。

二,安装

官方文档网址: http://nsq.io/overview/quick_start.html

下载地址: http://nsq.io/deployment/installing.html 目前的稳定版是V1.0.0,根据自己实际情况选择安装方式(写于2019/3/8)

配置nsq文件(以nsq3 10.1.194.245为例)

配置nsqlookupd.cfg文件

*如果nsqadmin在此节点上,配置nsqadmin文件nsqadmin.cfg

Systemd 管理启动 配置

配置nsqd.service

配置nsqlookupd.service

配置nsqadmin.service文件

开机启动

三,使用

域名地址:

默认情况下, 可以在浏览器输入:http://nsqadmin.darren.com:4171 打开监控面板

由于nsq提供了一个unix-like的工具,所以我们可以在终端使用以下命令进行消息的发送测试:

发送后, 可以在监控面板观察页面数据的变化

NSQ Topic和Channel命名规范

Channel命名规范:{环境} _ {产品线英文/拼音简写} _ {终端类型} {业务模块英文/拼音简写} Channel命名示例:PRE_SDYXMALL_APP_ORDER

Topic命名规范:{环境} _ {产品线英文/拼音简写} _ {业务模块英文/拼音简写} _ {消息主题} _ {事件类型} Topic命名示例:PRE_SDYXMALL_ORDER_STATUS_CHANGE

使用go编写NSQ消费端的例子

consumer.go

Go

main.go

Go

使用总结

nsq大部分情况基本能满足我们作为消息队列的要求,而且性能与单点故障处理能力也比较出色.

但它不适用的地方主要有:

有顺序要求的消息

不支持副本集的集群方式

四.排查

返回内部的统计信息

curl http://127.0.0.1:4151/stats

参考文档

极客学院 nsq指南

NSQ消息发送机制

配置文件解读:https://github.com/nsqio/nsq/blob/master/contrib/nsqd.cfg.example

Last updated