# 日志-kafka

## 介绍

写于2021.6.30

官网：<http://kafka.apache.org/>

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发，之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的，分区的和可复制的提交日志服务。

Apache Kafka与传统消息系统相比，有以下不同：

* 它被设计为一个分布式系统，易于向外扩展；
* 它同时为发布和订阅提供高吞吐量；
* 它支持多订阅者，当失败时能自动平衡消费者；
* 它将消息持久化到磁盘，因此可用于批量消费，例如ETL，以及实时应用程序。

### 一、 误区澄清与概念明确

**1 Kafka的版本**

很多人在Kafka中国社区(替群主做个宣传，QQ号：162272557)提问时的开头经常是这样的：“我使用的kafka版本是2.10/2.11, 现在碰到一个奇怪的问题。。。。” 无意冒犯，但这里的2.10/2.11不是kafka的版本，而是编译kafka的Scala版本。Kafka的server端代码是由Scala语言编写的，目前Scala主流的3个版本分别是2.10、2.11和2.12。实际上Kafka现在每个PULL request都已经自动增加了这三个版本的检查。下图是我的一个PULL request，可以看到这个fix会同时使用3个scala版本做编译检查：

![](https://55104664-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LFq8o_9x6ob05ww-sNi%2F-MdRK4Dghc1GWIYNeHGM%2F-MdRKWJ2VUrfUyC69A4R%2Fimage.png?alt=media\&token=08556ab0-a1c8-4506-9e7a-be234628ee69)

目前广泛使用kafka的版本应该是这三个大版本：0.8.x， 0.9.x和0.10.\* 。 这三个版本对于consumer和consumer group来说都有很大的变化，我们后面会详谈。

**2 新版本 VS 老版本**

“我的kafkaoffsetmonitor为什么无法监控到offset了？”——这是我在Kafka中国社区见到最多的问题，没有之一！实际上，Kafka 0.9开始提供了新版本的consumer及consumer group，位移的管理与保存机制发生了很大的变化——新版本consumer默认将不再保存位移到zookeeper中，而目前kafkaoffsetmonitor还没有应对这种变化(虽然已经有很多人在要求他们改了，详见[https://github.com/quantifind/KafkaOffsetMonitor/issues/79](https://www.gitbook.com/book/huxi_2b/kafka-internals/edit))，所以很有可能是因为你使用了新版本的consumer才无法看到的。关于新旧版本，这里统一说明一下：kafka0.9以前的consumer是使用Scala编写的，包名结构是kafka.consumer.\*，分为high-level consumer和low-level consumer两种。我们熟知的ConsumerConnector、ZookeeperConsumerConnector以及SimpleConsumer就是这个版本提供的；自0.9版本开始，Kafka提供了java版本的consumer，包名结构是o.a.k.clients.consumer.\*，熟知的类包括KafkaConsumer和ConsumerRecord等。新版本的consumer可以单独部署，不再需要依赖server端的代码。

原文：<https://www.cnblogs.com/huxi2b/p/6223228.html>

### 二，名词介绍

**Broker**：Kafka 集群包含一个或多个服务器，这种服务器被称为 broker。

**Topic**：每条发布到 Kafka 集群的消息都有一个类别，这个类别被称为 Topic。（物理上不同 Topic 的消息分开存储，逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上，但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处）。

**Partition**：Partition 是物理上的概念，每个 Topic 包含一个或多个 Partition。

**Producer**：负责发布消息到 Kafka broker。

**Consumer**：消息消费者，向 Kafka broker 读取消息的客户端。

**Consumer Group**：每个 Consumer 属于一个特定的 Consumer Group（可为每个 Consumer 指定 group name，若不指定 group name 则属于默认的 group）。

对于**消费者**，kafka中有两个设置的地方：对于老的消费者，&#x7531;**--zookeeper参数**设置；对于新的消费者，&#x7531;**--bootstrap-server参数**设置

如果使用了--zookeeper参数,那么consumer的信息将会存放在zk之中

查看的方法是使用./zookeeper-client,然后 ls /consumers/\[group\_id]/offsets/\[topic]/\[broker\_id-part\_id],这个是查看某个group\_id的某个topic的offset

如果使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中

&#x20;对于**console生产者**，**--broker-list参数**指定了所使用的broker

&#x20;参考 [Kafka bootstrap-servers vs zookeeper in kafka-console-consumer](https://stackoverflow.com/questions/41774446/kafka-bootstrap-servers-vs-zookeeper-in-kafka-console-consumer)  中说建议使用新版(新版本指的是kafka 0.8.0之后的版本)的 --bootstrap-server

### **三，关于kafka和zookeeper**

**原因：**&#x4E3B;要有两个目的/动机：一是优化元数据管理，原来的zk方案，极端情况下可能会造成数据不一致；二是简化部署和配置

Kafka2.8版本开始，可以不用Apache Zookeeper来作为Kafka的依赖组件了，官网把这种称之为KRaft模式。目前，Kafka使用Zookeeper来存储有关分区和Broker的元数据，并选择一个Broker作为Kafka的Controller。现在官网打算删除对Zookeeper的依赖，让Kafka能够以更具扩展性和更加强大的方式管理元数据，从而支持更多分区。

![](https://55104664-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LFq8o_9x6ob05ww-sNi%2F-MdVj5NPkdngyZtKV5E_%2F-MdVmPc9DLGnAsA73qwe%2Fimage.png?alt=media\&token=fa5aa313-90d0-4d80-b83b-27b3623cb68c)

KRaft目前在Kafka2.8版本是一个测试版本，KRaft模式不推荐使用到生产环境。当Kafka集群处理KRaft模式时，它不会将其元数据存储在Zookeeper中，实际上根本不需要运行Zookeeper，因为它将元数据存储在Controller节点中。

目前官网退出的KRaft模式仅用于测试，不推荐使用到生产环境，因为官方还不支持将现有的基于Zookeeper的Kafka集群升级到KRaft模式。实际上，当Kafka3.0发布时，无法将Kafka集群从2.8升级到3.0，目前该模式会有些BUG，如果尝试KRaft使用到生产环境，会存在数据丢失的风险。

可参考：<https://zhuanlan.zhihu.com/p/369398193>

以及<https://www.cnblogs.com/smartloli/p/14722529.html>

本文以Kafka 单节点单Broker部署及使用 做为演示

![](https://55104664-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LFq8o_9x6ob05ww-sNi%2F-MdRBggYqyCdlnIc9zCx%2F-MdRFFJhiKHiYvqWZIXB%2Fimage.png?alt=media\&token=12ed2f3b-1b63-45e8-bfc2-771200a8f873)

## 一，下载及安装

需要先安装zookeeper

安装教程参考：1.<https://darren.gitbook.io/project/devops/zookeeper-an-zhuang-ji-shi-yong>

2.<https://www.cnblogs.com/wang-hongwei/p/14700178.html>

下载 <http://kafka.apache.org/downloads.html>

```
yum install java-1.8.0-openjdk  java-1.8.0-openjdk-devel
通过jps查看启动
```

步骤如下：

```
下载地址：https://kafka.apache.org/down...

wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz

解压
tar -zxvf kafka_2.11-1.0.0.tgz

cd /usr/local/kafka_2.11-1.0.0/
```

## 二，修改配置

修改server.properties（/usr/local/kafka/config/server.properties）

```
#broker的全局唯一编号，不能重复

broker.id=1

 

#用来监听链接的端口，producer或consumer将在此端口建立连接

listeners=PLAINTEXT:http://192.168.40.148:9092

 

#kafka消息存放的路径

log.dirs=/home/servers-kafka/logs/kafka

 

#broker需要使用zookeeper保存meta数据

zookeeper.connect=192.168.40.148:2181 


如果是生产使用，如下
############################# Server Basics #############################
# broker 的全局唯一编号，不能重复
broker.id=0

############################# Socket Server Settings #############################
# 配置监听,，默认
listeners=PLAINTEXT://:9092

# 用来监听连接的端口，producer和consumer将在此端口建立连接,，默认
port=9092

# 处理网络请求的线程数量，默认
num.network.threads=3

# 用来处理磁盘IO的线程数量，默认
num.io.threads=8

# 发送套接字的缓冲区大小，默认
socket.send.buffer.bytes=102400

# 接收套接字的缓冲区大小，默认
socket.receive.buffer.bytes=102400

# 请求套接字的缓冲区大小，默认
socket.request.max.bytes=104857600

############################# Log Basics #############################
# kafka 运行日志存放路径
log.dirs=/root/export/servers/logs/kafka

# topic 在当前broker上的分片个数，默认为1
num.partitions=2

# 用来恢复和清理data下数据的线程数量，默认
num.recovery.threads.per.data.dir=1

############################# Log Retention Policy #############################
# segment文件保留的最长时间，超时将被删除，默认
log.retention.hours=168

# 滚动生成新的segment文件的最大时间，默认
log.roll.hours=168
————————————————
版权声明：本文为CSDN博主「harveybd」的原创文章，遵循CC 4.0 BY-SA版权协议，转载请附上原文出处链接及本声明。
原文链接：https://blog.csdn.net/HG_Harvey/article/details/79174104

 
```

## 三，启动

```
#启动ZK
/app/kafka/kafka_2.12-2.1.0/bin/zookeeper-server-start.sh  /app/kafka/kafka_2.12-2.1.0/config/zookeeper.properties
或者单节点启动zk：bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#启动Kafka
#调试启动

/app/kafka/kafka_2.12-2.1.0/bin/kafka-server-start.sh /app/kafka/kafka_2.12-2.1.0/config/server.properties


开机启动
vim /etc/systemd/system/zookeeper.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target  zookeeper.service

[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/java/jdk-11.0.1/bin"
User=root
Group=root
ExecStart=/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh /opt/kafka/kafka_2.11-2.1.0/config/server.properties
ExecStop=/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

原文链接：https://blog.csdn.net/csdnlihai/article/details/87787236

systemctl daemon-reload
systemctl start kafka
systemctl status kafka
```

## 测试及使用

**创建 topic**

使用 `kafka-topics.sh` 创建单分区单副本的 topic test：

```
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
```

查看 topic 列表：

```
bin/kafka-topics.sh --list --zookeeper localhost:2181
```

## 查看分片情况

/app/kafka/kafka\_2.12-2.1.0/bin/kafka-topics.sh --describe --zookeeper 192.168.40.148:2181

对应topic存储的数据：

生产消费测试 Kafka系列：生产消费消息测试Demo ———————————————— 版权声明：本文为CSDN博主「NIO4444」的原创文章，遵循CC 4.0 BY-SA版权协议，转载请附上原文出处链接及本声明。

## [kafka常用管理命令](https://www.cnblogs.com/wangzhuxing/p/10127497.html)

### 一、Topic管理

#### 1、创建topic

```
kafka-topics.sh --zookeeper 47.52.199.52:2181 --create --topic test-15 --replication-factor 1 --partitions 3
```

#### 2、新增partition

```
 kafka-topics.sh --zookeeper zk.server --alter --topic test --replication-factor 1 --partitions 3
```

注：topic一旦创建，partition只能增加，不能减少

#### 3、删除topic

```
kafka-topics.sh --zookeeper zk.server --delete --topic test 
```

#### 4、查看topic列表

```
kafka-topics.sh --zookeeper zk.server --list
```

#### 5、查看topic详细信息

```
kafka-topics.sh --zookeeper zk.server --topic test --describe
```

Partition是分区编号，Leader，Replicats和Isr里是broker\_id，而broker\_id是在$KAFKA\_HOME/config/server.properties里配置的

#### 6、查看某个topic的message数量

```
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 47.52.199.51:9092 --topic consumer-send
```

[回到顶部](https://www.cnblogs.com/wangzhuxing/p/10127497.html#_labelTop)

### 二、consumer管理

#### 1、查看consumer Group列表

```
./bin/kafka-consumer-groups.sh  --list  --bootstrap-server 192.168.88.108:9092
```

#### 2、查看指定group.id的消费情况

```
./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test-1 --describe
```

结果：

```
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
consumer-send   0          5697            5697            0               consumer-3-77500c76-ef09-4047-9ae0-89c20cc379fc /183.17.238.48  consumer-3
producer-syn    0          4125            4125            0               -                                               -               -
```

CURRENT-OFFSET:当前消费偏移量

LOG-END-OFFSET：末尾偏移量

#### 3、删除group

```
./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test-1 --delete
```

#### 4、重置offset

**1、要求修改的group不能active,查看是否activ**

```
[root@izj6c46svwddzpu0evy0vbz kafka_2.11-2.0.1]# ./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --describe
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Consumer group 'test_4' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
consumer-send   0          5697            5697            0               -               -               -
producer-syn    0          4125            4125            0               -               -               
```

**2、重置命令**

```
./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --reset-offsets -to-offset 100 --topic consumer-send --execute
```

**3、导出offset**

```
./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --reset-offsets -to-offset 100 --topic consumer-send --export > 1.txt
```

### 三、动态配置

#### 1、再平衡

```
./bin/kafka-preferred-replica-election.sh --zookeeper 47.52.199.51:2181/chroot
```

### 四、生产消费者

#### 1、启动kafka

```
nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
```

#### 2、创建消费者

```
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic middleware --from-beginning
```

#### 3、创建生产者

```
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
```

### 五，模拟测试

```
1.创建topic
bin/kafka-topics.sh --create --bootstrap-server 10.1.196.25:9092 --replication-factor 1 --partitions 2 --topic pordlog

2.创建groupid 不一定用上
bin/kafka-console-consumer.sh --bootstrap-server 10.1.196.25:9092 --topic pordlog --from-beginning --consumer-property group.id=es-prod

3.模拟测试
模拟生产者
bin/kafka-console-producer.sh --broker-list 10.1.196.25:9092 --topic pordlog
模拟消费者
bin/kafka-console-consumer.sh --bootstrap-server 10.1.196.25:9092 --topic pordlog --from-beginning --consumer-property group.id=es-prod

4.查看topic
bin/kafka-topics.sh --bootstrap-server 10.1.196.25:9092 --list

5.查看groupid
bin/kafka-consumer-groups.sh --bootstrap-server 10.1.196.25:9092 --list
```

## kafka-manager

kafka-manager，现在叫做CMAK，后来发现滴滴开源的kafka-manager，功能更丰富。看了下它的架构图和源码，还是很值得参考和借鉴的。

让我们来看看didi kafka-manager官网介绍的功能：

**一站式 `ApacheKafka`集群指标监控与运维管控平台**

**didi参考文档** [**https://github.com/didi/Logi-KafkaManager/blob/master/docs/install\_guide/install\_guide\_cn.md**](https://github.com/didi/Logi-KafkaManager/blob/master/docs/install_guide/install_guide_cn.md)

CMAK 参考文档 <https://www.cnblogs.com/frankdeng/p/9584870.html>

&#x20;kafka-manager 项目地址：<https://github.com/yahoo/kafka-manager>

## 参考文章

&#x20;原文链接：<https://blog.csdn.net/VIP099/article/details/105647966>

更多安装文章;<https://segmentfault.com/a/1190000012730949>

配置文件讲解：<https://www.cnblogs.com/sandea/p/12078442.html>

思路讲解：<https://juejin.cn/post/6844903847844904973>

基于Kubernetes部署Kafka集群 <https://blog.csdn.net/l1028386804/article/details/106773932>

阿里云官方安装k8s-kafka <https://blog.51cto.com/u_14783669/2666234>

&#x20;**kafkamanager管理工具** [**https://www.cnblogs.com/kevingrace/p/14412024.html**](https://www.cnblogs.com/kevingrace/p/14412024.html)

**kafka** [**https://javinjunfeng.gitbooks.io/kafka/content/chu-ji/kafkadan-jie-dian-zhi-ji-qun-de-an-zhuang-bu-shu-ji-zhu-yi-shi-xiang.html**](https://javinjunfeng.gitbooks.io/kafka/content/chu-ji/kafkadan-jie-dian-zhi-ji-qun-de-an-zhuang-bu-shu-ji-zhu-yi-shi-xiang.html)

**调优：**[**https://segmentfault.com/a/1190000022763403**](https://segmentfault.com/a/1190000022763403)

[kafka 2.8 版本部署](https://www.cnblogs.com/wang-hongwei/p/14700178.html)
