介绍
写于2021.6.30
官网:http://kafka.apache.org/
Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka与传统消息系统相比,有以下不同:
它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。
一、 误区澄清与概念明确
1 Kafka的版本
很多人在Kafka中国社区(替群主做个宣传,QQ号:162272557)提问时的开头经常是这样的:“我使用的kafka版本是2.10/2.11, 现在碰到一个奇怪的问题。。。。” 无意冒犯,但这里的2.10/2.11不是kafka的版本,而是编译kafka的Scala版本。Kafka的server端代码是由Scala语言编写的,目前Scala主流的3个版本分别是2.10、2.11和2.12。实际上Kafka现在每个PULL request都已经自动增加了这三个版本的检查。下图是我的一个PULL request,可以看到这个fix会同时使用3个scala版本做编译检查:
目前广泛使用kafka的版本应该是这三个大版本:0.8.x, 0.9.x和0.10.* 。 这三个版本对于consumer和consumer group来说都有很大的变化,我们后面会详谈。
2 新版本 VS 老版本
“我的kafkaoffsetmonitor为什么无法监控到offset了?”——这是我在Kafka中国社区见到最多的问题,没有之一!实际上,Kafka 0.9开始提供了新版本的consumer及consumer group,位移的管理与保存机制发生了很大的变化——新版本consumer默认将不再保存位移到zookeeper中,而目前kafkaoffsetmonitor还没有应对这种变化(虽然已经有很多人在要求他们改了,详见https://github.com/quantifind/KafkaOffsetMonitor/issues/79 ),所以很有可能是因为你使用了新版本的consumer才无法看到的。关于新旧版本,这里统一说明一下:kafka0.9以前的consumer是使用Scala编写的,包名结构是kafka.consumer.*,分为high-level consumer和low-level consumer两种。我们熟知的ConsumerConnector、ZookeeperConsumerConnector以及SimpleConsumer就是这个版本提供的;自0.9版本开始,Kafka提供了java版本的consumer,包名结构是o.a.k.clients.consumer.*,熟知的类包括KafkaConsumer和ConsumerRecord等。新版本的consumer可以单独部署,不再需要依赖server端的代码。
原文:https://www.cnblogs.com/huxi2b/p/6223228.html
二,名词介绍
Broker :Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。
Topic :每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。
Partition :Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition。
Producer :负责发布消息到 Kafka broker。
Consumer :消息消费者,向 Kafka broker 读取消息的客户端。
Consumer Group :每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。
对于消费者 ,kafka中有两个设置的地方:对于老的消费者,由--zookeeper参数 设置;对于新的消费者,由--bootstrap-server参数 设置
如果使用了--zookeeper参数,那么consumer的信息将会存放在zk之中
查看的方法是使用./zookeeper-client,然后 ls /consumers/[group_id]/offsets/[topic]/[broker_id-part_id],这个是查看某个group_id的某个topic的offset
如果使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中
对于console生产者 ,--broker-list参数 指定了所使用的broker
参考 Kafka bootstrap-servers vs zookeeper in kafka-console-consumer 中说建议使用新版(新版本指的是kafka 0.8.0之后的版本)的 --bootstrap-server
三,关于kafka和zookeeper
原因: 主要有两个目的/动机:一是优化元数据管理,原来的zk方案,极端情况下可能会造成数据不一致;二是简化部署和配置
Kafka2.8版本开始,可以不用Apache Zookeeper来作为Kafka的依赖组件了,官网把这种称之为KRaft模式。目前,Kafka使用Zookeeper来存储有关分区和Broker的元数据,并选择一个Broker作为Kafka的Controller。现在官网打算删除对Zookeeper的依赖,让Kafka能够以更具扩展性和更加强大的方式管理元数据,从而支持更多分区。
KRaft目前在Kafka2.8版本是一个测试版本,KRaft模式不推荐使用到生产环境。当Kafka集群处理KRaft模式时,它不会将其元数据存储在Zookeeper中,实际上根本不需要运行Zookeeper,因为它将元数据存储在Controller节点中。
目前官网退出的KRaft模式仅用于测试,不推荐使用到生产环境,因为官方还不支持将现有的基于Zookeeper的Kafka集群升级到KRaft模式。实际上,当Kafka3.0发布时,无法将Kafka集群从2.8升级到3.0,目前该模式会有些BUG,如果尝试KRaft使用到生产环境,会存在数据丢失的风险。
可参考:https://zhuanlan.zhihu.com/p/369398193
以及https://www.cnblogs.com/smartloli/p/14722529.html
本文以Kafka 单节点单Broker部署及使用 做为演示
一,下载及安装
需要先安装zookeeper
安装教程参考:1.https://darren.gitbook.io/project/devops/zookeeper-an-zhuang-ji-shi-yong
2.https://www.cnblogs.com/wang-hongwei/p/14700178.html
下载 http://kafka.apache.org/downloads.html
Copy yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel
通过jps查看启动
步骤如下:
Copy 下载地址:https://kafka.apache.org/down...
wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
解压
tar -zxvf kafka_2.11-1.0.0.tgz
cd /usr/local/kafka_2.11-1.0.0/
二,修改配置
修改server.properties(/usr/local/kafka/config/server.properties)
Copy #broker的全局唯一编号,不能重复
broker.id=1
#用来监听链接的端口,producer或consumer将在此端口建立连接
listeners=PLAINTEXT:http://192.168.40.148:9092
#kafka消息存放的路径
log.dirs=/home/servers-kafka/logs/kafka
#broker需要使用zookeeper保存meta数据
zookeeper.connect=192.168.40.148:2181
如果是生产使用,如下
############################# Server Basics #############################
# broker 的全局唯一编号,不能重复
broker.id=0
############################# Socket Server Settings #############################
# 配置监听,,默认
listeners=PLAINTEXT://:9092
# 用来监听连接的端口,producer和consumer将在此端口建立连接,,默认
port=9092
# 处理网络请求的线程数量,默认
num.network.threads=3
# 用来处理磁盘IO的线程数量,默认
num.io.threads=8
# 发送套接字的缓冲区大小,默认
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小,默认
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小,默认
socket.request.max.bytes=104857600
############################# Log Basics #############################
# kafka 运行日志存放路径
log.dirs=/root/export/servers/logs/kafka
# topic 在当前broker上的分片个数,默认为1
num.partitions=2
# 用来恢复和清理data下数据的线程数量,默认
num.recovery.threads.per.data.dir=1
############################# Log Retention Policy #############################
# segment文件保留的最长时间,超时将被删除,默认
log.retention.hours=168
# 滚动生成新的segment文件的最大时间,默认
log.roll.hours=168
————————————————
版权声明:本文为CSDN博主「harveybd」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/HG_Harvey/article/details/79174104
三,启动
Copy #启动ZK
/app/kafka/kafka_2.12-2.1.0/bin/zookeeper-server-start.sh /app/kafka/kafka_2.12-2.1.0/config/zookeeper.properties
或者单节点启动zk:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#启动Kafka
#调试启动
/app/kafka/kafka_2.12-2.1.0/bin/kafka-server-start.sh /app/kafka/kafka_2.12-2.1.0/config/server.properties
开机启动
vim /etc/systemd/system/zookeeper.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/java/jdk-11.0.1/bin"
User=root
Group=root
ExecStart=/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh /opt/kafka/kafka_2.11-2.1.0/config/server.properties
ExecStop=/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
原文链接:https://blog.csdn.net/csdnlihai/article/details/87787236
systemctl daemon-reload
systemctl start kafka
systemctl status kafka
测试及使用
创建 topic
使用 kafka-topics.sh
创建单分区单副本的 topic test:
Copy bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看 topic 列表:
Copy bin/kafka-topics.sh --list --zookeeper localhost:2181
查看分片情况
/app/kafka/kafka_2.12-2.1.0/bin/kafka-topics.sh --describe --zookeeper 192.168.40.148:2181
对应topic存储的数据:
生产消费测试 Kafka系列:生产消费消息测试Demo ———————————————— 版权声明:本文为CSDN博主「NIO4444」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
一、Topic管理
1、创建topic
Copy kafka-topics.sh --zookeeper 47.52.199.52:2181 --create --topic test-15 --replication-factor 1 --partitions 3
2、新增partition
Copy kafka-topics.sh --zookeeper zk.server --alter --topic test --replication-factor 1 --partitions 3
注:topic一旦创建,partition只能增加,不能减少
3、删除topic
Copy kafka-topics.sh --zookeeper zk.server --delete --topic test
4、查看topic列表
Copy kafka-topics.sh --zookeeper zk.server --list
5、查看topic详细信息
Copy kafka-topics.sh --zookeeper zk.server --topic test --describe
Partition是分区编号,Leader,Replicats和Isr里是broker_id,而broker_id是在$KAFKA_HOME/config/server.properties里配置的
6、查看某个topic的message数量
Copy ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 47.52.199.51:9092 --topic consumer-send
回到顶部
二、consumer管理
1、查看consumer Group列表
Copy ./bin/kafka-consumer-groups.sh --list --bootstrap-server 192.168.88.108:9092
2、查看指定group.id的消费情况
Copy ./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test-1 --describe
结果:
Copy TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumer-send 0 5697 5697 0 consumer-3-77500c76-ef09-4047-9ae0-89c20cc379fc /183.17.238.48 consumer-3
producer-syn 0 4125 4125 0 - - -
CURRENT-OFFSET:当前消费偏移量
LOG-END-OFFSET:末尾偏移量
3、删除group
Copy ./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test-1 --delete
4、重置offset
1、要求修改的group不能active,查看是否activ
Copy [root@izj6c46svwddzpu0evy0vbz kafka_2.11-2.0.1]# ./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --describe
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Consumer group 'test_4' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumer-send 0 5697 5697 0 - - -
producer-syn 0 4125 4125 0 - -
2、重置命令
Copy ./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --reset-offsets -to-offset 100 --topic consumer-send --execute
3、导出offset
Copy ./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --reset-offsets -to-offset 100 --topic consumer-send --export > 1.txt
三、动态配置
1、再平衡
Copy ./bin/kafka-preferred-replica-election.sh --zookeeper 47.52.199.51:2181/chroot
四、生产消费者
1、启动kafka
Copy nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
2、创建消费者
Copy bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic middleware --from-beginning
3、创建生产者
Copy bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
五,模拟测试
Copy 1.创建topic
bin/kafka-topics.sh --create --bootstrap-server 10.1.196.25:9092 --replication-factor 1 --partitions 2 --topic pordlog
2.创建groupid 不一定用上
bin/kafka-console-consumer.sh --bootstrap-server 10.1.196.25:9092 --topic pordlog --from-beginning --consumer-property group.id=es-prod
3.模拟测试
模拟生产者
bin/kafka-console-producer.sh --broker-list 10.1.196.25:9092 --topic pordlog
模拟消费者
bin/kafka-console-consumer.sh --bootstrap-server 10.1.196.25:9092 --topic pordlog --from-beginning --consumer-property group.id=es-prod
4.查看topic
bin/kafka-topics.sh --bootstrap-server 10.1.196.25:9092 --list
5.查看groupid
bin/kafka-consumer-groups.sh --bootstrap-server 10.1.196.25:9092 --list
kafka-manager
kafka-manager,现在叫做CMAK,后来发现滴滴开源的kafka-manager,功能更丰富。看了下它的架构图和源码,还是很值得参考和借鉴的。
让我们来看看didi kafka-manager官网介绍的功能:
一站式 ApacheKafka
集群指标监控与运维管控平台
didi参考文档 https://github.com/didi/Logi-KafkaManager/blob/master/docs/install_guide/install_guide_cn.md
CMAK 参考文档 https://www.cnblogs.com/frankdeng/p/9584870.html
kafka-manager 项目地址:https://github.com/yahoo/kafka-manager
参考文章
原文链接:https://blog.csdn.net/VIP099/article/details/105647966
更多安装文章;https://segmentfault.com/a/1190000012730949
配置文件讲解:https://www.cnblogs.com/sandea/p/12078442.html
思路讲解:https://juejin.cn/post/6844903847844904973
基于Kubernetes部署Kafka集群 https://blog.csdn.net/l1028386804/article/details/106773932
阿里云官方安装k8s-kafka https://blog.51cto.com/u_14783669/2666234
kafkamanager管理工具 https://www.cnblogs.com/kevingrace/p/14412024.html
kafka https://javinjunfeng.gitbooks.io/kafka/content/chu-ji/kafkadan-jie-dian-zhi-ji-qun-de-an-zhuang-bu-shu-ji-zhu-yi-shi-xiang.html
调优: https://segmentfault.com/a/1190000022763403
kafka 2.8 版本部署