k8s
  • Initial page
  • 序言
  • 前言
    • 发展历史
    • CNCF - 云原生计算基金会简介
    • Kubernetes与云原生应用的概念
  • 概念与原理
    • 基本概念总结
    • 开放接口
      • CRI - Container Runtime Interface
      • CNI - Container Network Interface
      • CSI - Container Storage Interface
    • 核心概念与原理
      • Kubernetes简介
      • Kubernetes架构与原理
      • 核心组件
      • 设计理念
      • 核心组件原理
        • etcd概念与原理
          • Etcd基于RAFT的一致性
          • Etcd v2 与 v3存储
        • kube-apiserver
        • kube-scheduler
        • kube-Controller Manager
        • Kubelet
        • kubectl常用命令
      • kubectl
      • kube-proxy
      • IPVS负载均衡
      • kube-dns
      • Federation-集群联邦
      • kubeadm
    • 资源对象与基本概念解析
    • 资源对象
      • Pod
        • Pod概述
        • Pod解析
        • Pod 的生命周期
        • 探针
        • Init 容器
        • Pause容器
        • Pod 安全策略
        • Pod hook
        • Pod Preset
        • pod其他设置
        • Pod中断与PDB
    • Kubernetes中的网络
      • 图解Kubernetes网络(一)
      • 图解Kubernetes网络(二)
      • 图解Kubernetes网络(三)
      • calico
      • flannel
    • 转发K8S后端服务的四种方式
    • 集群资源对象
      • Node
      • Namespace
      • Label
      • Annotation
      • Taint和Toleration(污点和容忍
      • 垃圾收集
      • Autoscaling
      • Horizontal Pod Autoscaling
        • Metrics-Server
        • Heapster
      • ReplicationController和ReplicaSet
    • 控制器资源对象
      • CronJob
      • Job
      • DaemonSet
      • Deployment
      • StatefulSet
    • 服务发现-资源对象
      • DNS原理讲解
      • Ingress 选型
      • Service
      • Ingress
    • 存储对象
      • ConfigMap
      • Volume
      • Persistent Volume(持久化卷)
      • StorageClass
      • 本地持久化存储
      • Secret
    • 策略对象
      • Resource Quota
      • SecurityContext
    • 身份对象
      • 认证
      • Service Account
      • RBAC——基于角色的访问控制
      • 准入控制
      • Network Policy
    • 资源调度
      • QoS(服务质量等级)
  • 插件扩展
    • Kubernetes的CI/CD
    • Dashboard
    • CoreDNS
    • 监控
      • 概述
      • 第1章 采集
        • Probes
        • Docker Stats
        • cAdvisor
        • Heapster
          • HPA
        • metrics-server
        • custom metrics自定义指标
        • kube-state-metrics
        • node-exporter
        • Prometheus
          • go 自定义metric
          • 本地存储
          • Prometheus概述
          • Prometheus基本架构
          • Prometheus部署方案
          • Prometheus的配置与服务发现
          • PromQL查询解析
          • Prometheus数据可视化
          • Prometheus存储机制
        • Sysdig
        • Untitled
      • 自定义监控
      • Custom-Metrics及Prometheus监控系统
      • grafana各种类型监控-实用
    • 日志
    • 存储
      • Kubernetes Ceph 工作原理详解
    • Metrics
    • GPU
    • Cluster AutoScaler
    • CI/CD
      • 基于DOCKER的CI工具—DRONE
      • DRONE安装指南
      • 如何使用DRONE
      • Drone
      • Jenkins
        • jenkins 集成 keycloak 认证
    • 50个免费的Kubernetes工具盘点
      • Kube集群部署工具
      • 监控工具
      • 测试工具
      • 安全工具
      • 实用的CLI工具
      • 开发工具
      • 无服务器/函数工具
      • 原生服务发现
      • 原生可视化与控制
    • Untitled
  • 领域应用
    • Istio
      • Helm安装
      • 安装并试用Istio service mesh
      • 示例应用部署
      • Bookinfo 应用-
      • 配置请求的路由规则
      • 故障注入
      • 流量转移
      • Istio流量管理实现机制深度解析
      • istio:监控能力介绍
      • Istio 04:Istio性能及扩展性介绍
      • Untitled
  • 实践
    • 大规模集群
    • 高可用
  • k8s运维排查
    • 常用命令
    • Kubernetes之YAML文件
      • yaml文件例子--pod
      • yaml文件例子--rc
    • Kubernetes运维
      • 集群管理
      • 集群与应用监控
      • 日志收集与管理
      • 常见问题定位
      • 权限管理RBAC
    • 排错概览
    • 集群排错
      • kubernetes集群管理常用命令一
    • Pod 排错
    • 网络排错
      • 容器内抓包定位网络问题
    • PV 排错
    • Windows 排错
    • 云平台排错
    • 集群安装脚本
    • 排错工具
    • 常见问题
      • k8s故障解决干货文档链接
      • 记一次Docker/Kubernetes上无法解释的连接超时原因探寻之旅
      • service没有负载均衡
      • kubernetes集群etcd空间配额2G的坑优化
    • K8S--100问
      • 解决 Docker 日志文件太大的问题
      • Kubernetes集群里容器之间的通讯方式
      • k8s 优化
      • lxcfs 在容器内显示容器的 CPU、内存状态
      • kubectl 创建 Pod流程
      • k8s网络-iptables
      • k8s底层网络原理
      • 网络排查
      • kubectl top 和 cadvisor metric ,docker state不一致的问题
      • 容器挂载数据卷的几种情况
      • 容器的终止流程
      • Kubernetes 中如何保证优雅地停止 Pod
      • K8S的apiVersion
      • 如何在Pod中执行宿主机上的命令
      • 创建 Pod 流程
      • k8s主要组件说明
      • 节点网络规划
      • Deployment管理方式
      • pod的分配方式
  • 深入浅出k8s
    • 说明
    • k8s发布策略介绍
    • oom kill原理讲解
    • Kubernetes 的架构设计与实现原理
  • 附录
    • CKA认证
    • 生态圈
    • 资讯快报
      • 2018态势回顾与2019年前景展望
      • Untitled
    • 学习资源
    • 参考文档
    • Kubernetes版本更新日志
      • Kubernetes 1.14 更新日志
      • Kubernetes 1.13 更新日志
      • Kubernetes1.12更新日志
      • Kubernetes1.10更新日志
      • Kubernetes1.11更新日志
  • 思维导图
    • k8s
    • DEVOPS
  • DEVOPS
    • 开源仓库-nexus
      • 一,nexus的安装
      • 二,使用nexus3配置docker私有仓库
      • 三,使用nexus3配置maven私有仓库
      • 四,nexus-3.14.0升级到3.15.2
      • 五,nexus3搭建golang私服
    • vpn
      • openvpn
    • Tcpdump 示例教程
    • Ipsec VPN-centos7使用strangwang搭建vpn
    • yum安装redis及常用指令
    • 数据库
      • mysql表操作
      • mysql 库常用操作及备份还原
      • MySQL 优化实施方案
    • NSQ
      • nsq问题解答
      • 选型
      • docker-compose部署 简单nsq 集群
    • 部署Redis集群
    • zookeeper安装及使用
    • Etcd
      • Untitled
      • Etcd配置
  • k8s系统完整部署
    • CentOS7.5 使用二进制程序部署Kubernetes1.12.2
    • 二进制的方式部署 K8S-1.16 高可用集群
    • CoreOS部署Kubernetes集群
    • EFK
      • 日志-kafka
      • logstash的部署、整合ELK+Filebeat
      • 应用日志收集
      • ES搭建
      • es集群部署
      • ElasticSearch技术原理
      • Elasticsearch操作
      • kibana
      • kibana简单使用
      • 非K8S主机部署Filebat
    • 镜像仓库-Harbor
    • Harbor 2.6.2安装
    • cURL 命令获取本机外网 IP
    • Shell 解析 JSON
    • 制作 gitbook 文档镜像,运行在 K8S 上
    • Kubernetes 之 MySQL 持久存储和故障转移
    • 如何删除etcd上的旧数据
    • Git 实战教程
  • 生活
    • 信合.阳光城
Powered by GitBook
On this page
  • 一,简介
  • (1)单点
  • (2)集群
  • 1 消息队列的作用
  • 2 NSQ介绍
  • 二,安装
  • 配置nsq文件(以nsq3 10.1.194.245为例)
  • 配置nsqlookupd.cfg文件
  • *如果nsqadmin在此节点上,配置nsqadmin文件nsqadmin.cfg
  • Systemd 管理启动 配置
  • 三,使用
  • 四.排查
  • 参考文档
  1. DEVOPS

NSQ

PreviousMySQL 优化实施方案Nextnsq问题解答

Last updated 5 years ago

一,简介

nsq原始代码进行学习

分布式消息队列 NSQ,NSQ是google开发,用go实现的消息队列服务,

NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议

两种比较常用的消息队列——NSQ和Kafka

下面上一张Nsq与其他mq的对比图

图片来自golang2017开发者大会

示意图清晰的展示了整个NSQ架构的端口关系:

生产消费过程

(1) 生产者连接nsqd,通过http接口,写入消息到topic

(2) topic把消息分发到所有的channel

(3) 消费者通过nsqdlookupd发现topic对应的nsqd,连接到nsqd上,订阅指定的channel,当有消息到达时进行消费

(4) 同一个channel下多个消费者协作消费,channel会自动做负载均衡把消息发给不同的消费者

在 news_data_consumer 这个项目中,我们的配置中指定了一个topic和一个channel,而项目本身是单线程、多机器(多进程)运行的,所以不同机器构成了不同的consumer,然后同一个channel会自动做负载均衡交给多台机器消费

消费者有两种方式与nsqd建立连接

1.消费者直连nsqd,这是最简单的方式,缺点是nsqd服务无法实现动态伸缩了(当然,自己去实现一个也是可以的)

2.消费者通过http查询nsqlookupd获取该nsqlookupd上所有nsqd的连接地址,然后再分别和这些nsqd建立连接(官方推荐的做法),但是客户端会不停的向nsqlookupd查询最新的nsqd地址目录

consumer如果通过 nsqlookupd 节点自动发现,会默认自动订阅所有包含该topic的nsqd节点;会消费所有nsqd节点的消息,消息的顺序不保证有序;如果指定连接某一个nsqd,那么只会订阅这一个nsqd,只会消费该nsqd下的消息。官方推荐的是通过 nsqlookupd 发现

生产者

生产者必须直连nsqd去投递message

这里有一个问题就是如果生产者所连接的nsqd炸了,那么message就会投递失败,所以在客户端必须自己实现相应的备用方案

生产者创建时要(1)指定ip+端口号(默认为4150),(2)publish消息的时候要指定topic

(1)单点

流程说明:

①Consumer1首先使用HTTP连接nsqlookupd的4161端口,获取Topic1相关nsqd的TCP4150端口。

②使用TCP连接到nsqd的4150端口,并生成对应的Channel1;

特别需要注意的是,当所需Topic不变的情况下,就算nsqlookupd和nsqadmin进程都杀掉,也不影响nsqd的生产和消费。

(2)集群

说明:

①当开启多个nsqd才存在集群的意义。

②尽量避免多个nsqd存在相同的Topic。如果多个nsqd真的存在相同的Topic的情况下,通过nsqlookupd将会返回所有这个Topic的IP并都能进行读取处理。(使用nsq-j进行过测试验证。)

1 消息队列的作用

  1. 解耦,将一个流程加入一层数据接口拆分成两个部分,上游专注通知,下游专注处理

  2. 缓冲,应对流量的突然上涨变更,消息队列有很好的缓冲削峰作用

  3. 异步,上游发送消息以后可以马上返回,处理工作交给下游进行

  4. 广播,让一个消息被多个下游进行处理

  5. 冗余,保存处理的消息,防止消息处理失败导致的数据丢失

2 NSQ介绍

2.1 组件

NSQ主要包含3个组件:

  • nsqd:在服务端运行的守护进程,负责接收,排队,投递消息给客户端。能够独立运行,不过通常是由 nsqlookupd 实例所在集群配置的

  • nsqlookup:为守护进程,负责管理拓扑信息并提供发现服务。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息

  • nsqadmin:一套WEB UI,用来汇集集群的实时统计,并执行不同的管理任务

2.2 特性

  1. 消息默认不可持久化,虽然系统支持消息持久化存储在磁盘中(通过 –mem-queue-size ),不过默认情况下消息都在内存中

  2. 消息最少会被投递一次,假设成立于 nsqd 节点没有错误

  3. 消息无序,是由重新队列(requeues),内存和磁盘存储的混合导致的,实际上,节点间不会共享任何信息。它是相对的简单完成疏松队列

  4. 支持无 SPOF 的分布式拓扑,nsqd 和 nsqadmin 有一个节点故障不会影响到整个系统的正常运行

  5. 支持requeue,延迟消费机制

  6. 消息push给消费者

内存限制

NSQ通过提供–mem-queue-size的配置选项来设置内存中的消息队列的大小。如果超过消息队列的大小,消息将写入硬盘。细心的读者会发现,通过设置内存消息队列的大小低到某个值时(如1或0)可以提高消息投递的可靠性。后端的硬盘队列用于非正常退出时恢复消息投递。

对于消息投递的可靠性,正常退出时消息会安全的被持久化到硬盘中,包括内存队列、投递中队列、延迟队列以及各种内部缓存。

注意,每个topic和channel的名称后面如果是以#ephemeral结尾,那么缓存中的消息不被持久化到硬盘中而且超过内存消息队列大小时,消息将被丢弃。这使消费者在订阅一个channel时可以无需消息投递可靠性。这种临时的channel在无客户端连接时会自动消失。对于一个临时的topic,意味着至少有一个channel被创建、消费和删除(通常也是临时的channel)。

消息投递的可靠性

NSQ保证消息投递至少一次,有可能重复投递。消费者应该自行保证消息处理的幂等性。

这个可靠性也作为消息协议的一部分,处理流程如下:

1.客户端声明准备好接收消息

2.NSQ发送一个消息并临时保存消息到本地

3.客户端发送FIN或者REQ表明处理成功或者失败。如果客户端未在规定的时间内作出响应,NSQ会根据设置的超时时间来自动把消息重新入队列。

这就保证了消息只有在NSQ非正常关闭时会发生丢失。这种情况下,所有内存的信息都会丢失。

如果防止消息丢失是非常重要的,即使是非正常退出,也是可以减少影响度的。一种方式是增加nsqd节点(部署在不同host),来备份消息。因为客户端处理消息是幂等的所以多次消息投递不影响下游系统,以保证任何单个节点故障不至于引起消息丢失。

关键是NSQ要提供构建的模块去支持多种生产用例和可配置程度的耐久性

消除单点故障(SPOF)

NSQ采用的是分布式的设计方式。客户端通过长连接,连接所有指定topic的所有nsqd实例,这里没有中间人,没有单点故障。

只要有足够的客户端消费者连接到所有的生产者以保障大量的消息处理,就能保证所有的信息最终都可以被处理。

对于nsqlookupd,可以通过多个实例来实现高可用。他们之间不需要之间通信,数据是会最终一致的。消费者通过配置的nsqlookupd实例来获取所有信息并将其汇总

2.3 流程

单个nsqd实例一次可以处理多个消息流。消息流被称为“topics”,一个topic有一个或者多个“channels”。每个channel会接收来自topic的所有消息。实际上,一个channel对应下游个服务消费的一个topic

topics和channels并不是预先配置的。topics是在首次发布或者订阅的时候创建的,channels是在首次订阅的时候创建的。

topics和channels的所有缓存数据都相互独立,目的是为了防止一个“慢”消费者造成消息积压而影响其他topic或channel。

一个channel通常有多个消费者连接,假如所有消费者都是在准备接收消息状态,每个消息会被随机投递到消费者中。

综述,消息是从topic广播到channel,然后从channel投递到消费者中。

NSQ还有一个辅助工具nsqlookupd,它提供了服务发现功能,使消费者能够订阅感兴趣的topic所在的所有nsqd的地址。同时在配置方面,使消费者和生产者解耦,他们不需要知道彼此,只需要通过nsqlookupd来建立联系,降低复杂性。

在底层实现中,每个nsqd和nsqlookupd都建立了长连接,定期推送自己的状态信息。nsqlookupd通过这些信息来判断哪个nsqd地址应该返回给消费者。

当添加一个新的消费者时,只要给nsqd客户端初始化nsqlookupd的实例地址。所以在新增消费者或者生产者时都无需修改任何配置,大大减少了复杂度。

需要重点强调的是,nsqd和nsqlookupd的守护进程是相互独立的,在兄弟节点之间没有通信和协作。

我们认为通过某种方式去观察、管理这些节点是非常重要的。我们构建了nsqadmin来处理这件事情,它提供了web界面来浏览topics/channels/consumers的层级、检查深度,以及其他的信息。而且还提供了一些管理员命令,比如移除和清空channel。

单个nsqd可以有多个Topic,每个Topic又可以有多个Channel。Channel能够接收Topic所有消息的副本,从而实现了消息多播分发;而Channel上的每个消息被分发给它的订阅者,从而实现负载均衡,所有这些就组成了一个可以表示各种简单和复杂拓扑结构的强大框架。

核心概念

topic: topic是nsq的消息发布的逻辑关键词。当程序初次发布带topic的消息时,如果topic不存在,则会被创建。

channels: 当生产者每次发布消息的时候,消息会采用多播的方式被拷贝到各个channel中,channel起到队列的作用。

messages: 数据流的形式。

二,安装

#设置主机名
[root@nsq3 ~] hostnamectl set-hostname nsq3
#创建安装目录
[root@nsq3 ~]#mkdir /home/nsq
[root@nsq3 ~]#cd /home/nsq/
[root@nsq3 ~]#mkdir config log data
#下载文件
[root@nsq3 ~]#wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz
#解压
[root@nsq3 ~]#tar xzvf nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz
[root@nsq3 ~]#ln -s nsq-1.0.0-compat.linux-amd64.go1.8 nsq

配置nsq文件(以nsq3 10.1.194.245为例)

vim /home/nsq/config/nsqd.cfg
内容如下
## log verbosity level: debug, info, warn, error, or fatal
log-level = "info"

## unique identifier (int) for this worker (will default to a hash of hostname)
# id = 5150

## <addr>:<port> to listen on for TCP clients
tcp_address = "0.0.0.0:4150"

## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4151"

## <addr>:<port> to listen on for HTTPS clients
# https_address = "0.0.0.0:4152"

## address that will be registered with lookupd (defaults to the OS hostname)
broadcast_address = "10.1.194.245"

## cluster of nsqlookupd TCP addresses
nsqlookupd_tcp_addresses = [
    "10.1.194.243:4160",
    "10.1.194.244:4160",
    "10.1.194.245:4160"
]

## duration to wait before HTTP client connection timeout
http_client_connect_timeout = "2s"

## duration to wait before HTTP client request timeout
http_client_request_timeout = "5s"

## path to store disk-backed messages
data_path = "/home/nsq/data"

## number of messages to keep in memory (per topic/channel)
mem_queue_size = 10000

## number of bytes per diskqueue file before rolling 101m
max_bytes_per_file = 104857600

## number of messages per diskqueue fsync
sync_every = 2500

## duration of time per diskqueue fsync (time.Duration)
sync_timeout = "2s"


## duration to wait before auto-requeing a message
msg_timeout = "60s"

## maximum duration before a message will timeout
max_msg_timeout = "15m"

## maximum size of a single message in bytes
max_msg_size = 1024768

## maximum requeuing timeout for a message
max_req_timeout = "1h"

## maximum size of a single command body
max_body_size = 5123840


## maximum client configurable duration of time between client heartbeats
max_heartbeat_interval = "60s"

## maximum RDY count for a client
max_rdy_count = 2500

## maximum client configurable size (in bytes) for a client output buffer
max_output_buffer_size = 65536

## maximum client configurable duration of time between flushing to a client (time.Duration)
max_output_buffer_timeout = "1s"


## UDP <addr>:<port> of a statsd daemon for pushing stats
# statsd_address = "127.0.0.1:8125"

## prefix used for keys sent to statsd (%s for host replacement)
statsd_prefix = "nsq.%s"

## duration between pushing to statsd (time.Duration)
statsd_interval = "60s"

## toggle sending memory and GC stats to statsd
statsd_mem_stats = true


## message processing time percentiles to keep track of (float)
e2e_processing_latency_percentiles = [
    100.0,
    99.0,
    95.0
]

## calculate end to end latency quantiles for this duration of time (time.Duration)
e2e_processing_latency_window_time = "10m"


## path to certificate file
tls_cert = ""

## path to private key file
tls_key = ""

## set policy on client certificate (require - client must provide certificate,
##  require-verify - client must provide verifiable signed certificate)
# tls_client_auth_policy = "require-verify"

## set custom root Certificate Authority
# tls_root_ca_file = ""

## require client TLS upgrades
tls_required = false

## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2")
tls_min_version = ""

## enable deflate feature negotiation (client compression)
deflate = true

## max deflate compression level a client can negotiate (> values == > nsqd CPU usage)
max_deflate_level = 6

## enable snappy feature negotiation (client compression)
snappy = true

配置nsqlookupd.cfg文件

vim /home/nsq/config/nsqlookupd.cfg
cat > /home/nsq/config/nsqlookupd.cfg << EOF  #也可以用这种方式导入,可删除此行
## log verbosity level: debug, info, warn, error, or fatal
log-level = "info"

## <addr>:<port> to listen on for TCP clients
tcp_address = "0.0.0.0:4160"

## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4161"

## address that will be registered with lookupd (defaults to the OS hostname)
broadcast_address = "10.1.194.245"


## duration of time a producer will remain in the active list since its last ping
inactive_producer_timeout = "300s"

## duration of time a producer will remain tombstoned if registration remains
tombstone_lifetime = "45s"

*如果nsqadmin在此节点上,配置nsqadmin文件nsqadmin.cfg

## log verbosity level: debug, info, warn, error, or fatal
log-level = "info"

## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4171"

## graphite HTTP address
graphite_url = ""

## proxy HTTP requests to graphite
proxy_graphite = false

## prefix used for keys sent to statsd (%s for host replacement, must match nsqd)
statsd_prefix = "nsq.%s"

## format of statsd counter stats
statsd_counter_format = "stats.counters.%s.count"

## format of statsd gauge stats
statsd_gauge_format = "stats.gauges.%s"

## time interval nsqd is configured to push to statsd (must match nsqd)
statsd_interval = "60s"

## HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent
notification_http_endpoint = ""


## nsqlookupd HTTP addresses
nsqlookupd_http_addresses = [
    "10.1.194.243:4161",
    "10.1.194.244:4161",
    "10.1.194.245:4161"
]

## nsqd HTTP addresses (optional)
#nsqd_http_addresses = [
#    "127.0.0.1:4151"
#]

Systemd 管理启动 配置

配置nsqd.service

cat > /etc/systemd/system/nsqd.service << EOF
[Unit]
Description=NSQD
After=network.target
[Service]
LimitCORE=infinity
LimitNOFILE=100000
LimitNPROC=100000
WorkingDirectory=/home/nsq
ExecStart=/home/nsq/nsq/bin/nsqd -config=/home/nsq/config/nsqd.cfg
ExecReload=/bin/kill -HUP $MAINPID
Type=simple
KillMode=process
Restart=on-failure
RestartSec=10s
User=root
[Install]
WantedBy=multi-user.target
EOF

配置nsqlookupd.service

cat > /etc/systemd/system/nsqlookupd.service << EOF
[Unit]
Description=NSQLookupD
After=network.target
[Service]
LimitCORE=infinity
LimitNOFILE=100000
LimitNPROC=100000
WorkingDirectory=/home/nsq
ExecStart=/home/nsq/nsq/bin/nsqlookupd -config=/home/nsq/config/nsqlookupd.cfg
ExecReload=/bin/kill -HUP $MAINPID
Type=simple
KillMode=process
Restart=on-failure
RestartSec=10s
User=root
[Install]
WantedBy=multi-user.target
EOF

配置nsqadmin.service文件

cat > /etc/systemd/system/nsqadmin.service << EOF
  171  [Unit]
  172  Description=NSQAdmin
  173  After=network.target
  174  [Service]
  175  LimitCORE=infinity
  176  LimitNOFILE=100000
  177  LimitNPROC=100000
  178  WorkingDirectory=/home/nsq
  179  ExecStart=/home/nsq/nsq/bin/nsqadmin -config=/home/nsq/config/nsqadmin.cfg
  180  ExecReload=/bin/kill -HUP $MAINPID
  181  Type=simple
  182  KillMode=process
  183  Restart=on-failure
  184  RestartSec=10s
  185  User=root
  186  [Install]
  187  WantedBy=multi-user.target
  188  EOF

开机启动

systemctl daemon-reload
systemctl start nsqd
systemctl enable nsqd;systemctl enable nsqlookupd;systemctl enable nsqadmin
systemctl start nsqlookupd
systemctl start nsqadmin
#查看状态
systemctl status nsqd
systemctl status nsqlookupd

三,使用

域名地址:

NSQ  做好域名解析
nsqlookup的地址:

http:
nsq1.darren.com:4161
nsq2.darren.com:4161
nsq3.darren.com:4161
tcp:
nsq1.darren.com:4160
nsq2.darren.com:4160
nsq3.darren.com:4160
nsqd的地址:

http:
nsq1.darren.com:4151
nsq2.darren.com:4151
nsq3.darren.com:4151
tcp:
nsq1.darren.com:4150
nsq2.darren.com:4150
nsq3.darren.com:4150
nsqadmin的地址

http://nsqadmin.darren.com:4171

由于nsq提供了一个unix-like的工具,所以我们可以在终端使用以下命令进行消息的发送测试:

$ curl -d 'hello world 1' 'http://127.0.0.1:4151/put?topic=test'
$ curl -d 'hello world 2' 'http://127.0.0.1:4251/put?topic=test'

发送后, 可以在监控面板观察页面数据的变化

NSQ Topic和Channel命名规范

Channel命名规范:{环境} _ {产品线英文/拼音简写} _ {终端类型} {业务模块英文/拼音简写} Channel命名示例:PRE_SDYXMALL_APP_ORDER

Topic命名规范:{环境} _ {产品线英文/拼音简写} _ {业务模块英文/拼音简写} _ {消息主题} _ {事件类型} Topic命名示例:PRE_SDYXMALL_ORDER_STATUS_CHANGE

使用go编写NSQ消费端的例子

consumer.go

package queue

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "log"
    "os"
)

type logger interface {
    Output(int, string) error
}

type Consumer struct {
    client      *nsq.Consumer
    config      *nsq.Config
    nsqds       []string
    nsqlookupds []string
    concurrency int
    channel     string
    topic       string
    level       nsq.LogLevel
    log         logger
    err         error
}

//初始化消费端
func NewConsumer(topic, channel string) *Consumer {
    return &Consumer{
        log:         log.New(os.Stderr, "", log.LstdFlags),
        config:      nsq.NewConfig(),
        level:       nsq.LogLevelInfo,
        channel:     channel,
        topic:       topic,
        concurrency: 1,
    }
}

func (c *Consumer) SetLogger(log logger, level nsq.LogLevel) {
    c.level = level
    c.log = log
}

func (c *Consumer) SetMap(options map[string]interface{}) {
    for k, v := range options {
        c.Set(k, v)
    }
}

func (c *Consumer) Set(option string, value interface{}) {
    switch option {
    case "topic":
        c.topic = value.(string)
    case "channel":
        c.channel = value.(string)
    case "concurrency":
        c.concurrency = value.(int)
    case "nsqd":
        c.nsqds = []string{value.(string)}
    case "nsqlookupd":
        c.nsqlookupds = []string{value.(string)}
    case "nsqds":
        s, err := strings(value)
        if err != nil {
            c.err = fmt.Errorf("%q: %v", option, err)
            return
        }
        c.nsqds = s
    case "nsqlookupds":
        s, err := strings(value)
        if err != nil {
            c.err = fmt.Errorf("%q: %v", option, err)
            return
        }
        c.nsqlookupds = s
    default:
        err := c.config.Set(option, value)
        if err != nil {
            c.err = err
        }
    }
}

func (c *Consumer) Start(handler nsq.Handler) error {

    if c.err != nil {
        return c.err
    }

    client, err := nsq.NewConsumer(c.topic, c.channel, c.config)
    if err != nil {
        return err
    }
    c.client = client
    client.SetLogger(c.log, c.level)
    client.AddConcurrentHandlers(handler, c.concurrency)
    return c.connect()
}

//连接到nsqd
func (c *Consumer) connect() error {

    if len(c.nsqds) == 0 && len(c.nsqlookupds) == 0 {
        return fmt.Errorf(`at least one "nsqd" or "nsqlookupd" address must be configured`)
    }

    if len(c.nsqds) > 0 {
        err := c.client.ConnectToNSQDs(c.nsqds)
        if err != nil {
            return err
        }
    }
    if len(c.nsqlookupds) > 0 {
        err := c.client.ConnectToNSQLookupds(c.nsqlookupds)
        if err != nil {
            return err
        }
    }
    return nil
}

//stop and wait
func (c *Consumer) Stop() error {
    c.client.Stop()
    <-c.client.StopChan
    return nil
}

func strings(v interface{}) ([]string, error) {
    switch v.(type) {
    case []string:
        return v.([]string), nil
    case []interface{}:
        var ret []string
        for _, e := range v.([]interface{}) {
            s, ok := e.(string)
            if !ok {
                return nil, fmt.Errorf("string expected")
            }
            ret = append(ret, s)
        }
        return ret, nil
    default:
        return nil, fmt.Errorf("strings expected")
    }
}

Go

main.go


package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "your_go_path/project/queue"
)

func main() {
    done := make(chan bool)
    c := queue.NewConsumer("test", "testchan2")
    c.Set("nsqds", []string{"192.168.139.134:4150", "192.168.139.134:4250"})
    c.Set("concurrency", 15)
    c.Set("max_attempts", 10)
    c.Set("max_in_flight", 150)
    err := c.Start(nsq.HandlerFunc(func(msg *nsq.Message) error {
        fmt.Println("customer2:", string(msg.Body))
        return nil
    }))
    if err != nil {
        fmt.Errorf(err.Error())
    }
    <-done
}

Go

使用总结

nsq大部分情况基本能满足我们作为消息队列的要求,而且性能与单点故障处理能力也比较出色.

但它不适用的地方主要有:

有顺序要求的消息

不支持副本集的集群方式

举例说明
[Topic里的AURAYOU随具体的业务线而确定]

订单消息通知
1、消息Topic:AURAYOU_ORDER_STATUS_CHANGE
更新订单状态的消息格式

{
      "head":{
         "msgId":"2017082115231500000001",   //消息ID(22位)    全局唯一  当前时间格式化为年月日时分秒+8位随机数
         "sourceIp":"225.0.0.1",    //消息发布系统的来源IP
         "createdAt":143000000   //消息发布时间戳(秒)
      },
      "businessData":{
        "salesChannelId":3,
        "payOrderId":"111111",
        "orderId":"222222",
        "userId":11111,
        "orderStatus":8,
        "orderType":0,
        "appStoreId":"1"
      }
 }
2、消息Topic:AURAYOU_PAYORDER_STATUS_CHANGE
更新订单支付状态的消息格式

{
      "head":{
         "msgId":"2017082115231500000001",   //消息ID(22位)    全局唯一  当前时间格式化为年月日时分秒+8位随机数
         "sourceIp":"225.0.0.1",    //消息发布系统的来源IP
         "createdAt":143000000   //消息发布时间戳(秒)
      },
      "businessData":{
        "salesChannelId":3,
        "payOrderId":"111111",
        "orderIdList":["222222","3333333"],
        "userId":123434,
        "orderStatus":8,
        "orderType":0,
        "appStoreId":"1"
      }
 }

四.排查

/home/nsq/nsq/bin/nsq_stat -nsqd-http-address=127.0.0.1:4151 --topic=AURAYOU_ORDER_EXPRESS_SIGN --channel=AURA_EXPRESS

返回内部的统计信息

参考文档

n

以上摘抄于

官方文档网址:

下载地址: 目前的稳定版是V1.0.0,根据自己实际情况选择安装方式(写于2019/3/8)

默认情况下, 可以在浏览器输入: 打开监控面板

curl

极客学院

配置文件解读:

sq详细介绍
分布式消息队列-NSQ-和-Kafka-对比
http://nsq.io/overview/quick_start.html
http://nsq.io/deployment/installing.html
http://nsqadmin.darren.com:4171
http://127.0.0.1:4151/stats
nsq指南
NSQ消息发送机制
https://github.com/nsqio/nsq/blob/master/contrib/nsqd.cfg.example
https://nercoeus.github.io
NSQ端口关系以及注意事项 - pcwen.top - 博客园
Logo