# 日志-kafka

## 介绍

写于2021.6.30

官网：<http://kafka.apache.org/>

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发，之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的，分区的和可复制的提交日志服务。

Apache Kafka与传统消息系统相比，有以下不同：

* 它被设计为一个分布式系统，易于向外扩展；
* 它同时为发布和订阅提供高吞吐量；
* 它支持多订阅者，当失败时能自动平衡消费者；
* 它将消息持久化到磁盘，因此可用于批量消费，例如ETL，以及实时应用程序。

### 一、 误区澄清与概念明确

**1 Kafka的版本**

很多人在Kafka中国社区(替群主做个宣传，QQ号：162272557)提问时的开头经常是这样的：“我使用的kafka版本是2.10/2.11, 现在碰到一个奇怪的问题。。。。” 无意冒犯，但这里的2.10/2.11不是kafka的版本，而是编译kafka的Scala版本。Kafka的server端代码是由Scala语言编写的，目前Scala主流的3个版本分别是2.10、2.11和2.12。实际上Kafka现在每个PULL request都已经自动增加了这三个版本的检查。下图是我的一个PULL request，可以看到这个fix会同时使用3个scala版本做编译检查：

![](/files/-MdRKWJ2VUrfUyC69A4R)

目前广泛使用kafka的版本应该是这三个大版本：0.8.x， 0.9.x和0.10.\* 。 这三个版本对于consumer和consumer group来说都有很大的变化，我们后面会详谈。

**2 新版本 VS 老版本**

“我的kafkaoffsetmonitor为什么无法监控到offset了？”——这是我在Kafka中国社区见到最多的问题，没有之一！实际上，Kafka 0.9开始提供了新版本的consumer及consumer group，位移的管理与保存机制发生了很大的变化——新版本consumer默认将不再保存位移到zookeeper中，而目前kafkaoffsetmonitor还没有应对这种变化(虽然已经有很多人在要求他们改了，详见[https://github.com/quantifind/KafkaOffsetMonitor/issues/79](https://www.gitbook.com/book/huxi_2b/kafka-internals/edit#))，所以很有可能是因为你使用了新版本的consumer才无法看到的。关于新旧版本，这里统一说明一下：kafka0.9以前的consumer是使用Scala编写的，包名结构是kafka.consumer.\*，分为high-level consumer和low-level consumer两种。我们熟知的ConsumerConnector、ZookeeperConsumerConnector以及SimpleConsumer就是这个版本提供的；自0.9版本开始，Kafka提供了java版本的consumer，包名结构是o.a.k.clients.consumer.\*，熟知的类包括KafkaConsumer和ConsumerRecord等。新版本的consumer可以单独部署，不再需要依赖server端的代码。

原文：<https://www.cnblogs.com/huxi2b/p/6223228.html>

### 二，名词介绍

**Broker**：Kafka 集群包含一个或多个服务器，这种服务器被称为 broker。

**Topic**：每条发布到 Kafka 集群的消息都有一个类别，这个类别被称为 Topic。（物理上不同 Topic 的消息分开存储，逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上，但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处）。

**Partition**：Partition 是物理上的概念，每个 Topic 包含一个或多个 Partition。

**Producer**：负责发布消息到 Kafka broker。

**Consumer**：消息消费者，向 Kafka broker 读取消息的客户端。

**Consumer Group**：每个 Consumer 属于一个特定的 Consumer Group（可为每个 Consumer 指定 group name，若不指定 group name 则属于默认的 group）。

对于**消费者**，kafka中有两个设置的地方：对于老的消费者，&#x7531;**--zookeeper参数**设置；对于新的消费者，&#x7531;**--bootstrap-server参数**设置

如果使用了--zookeeper参数,那么consumer的信息将会存放在zk之中

查看的方法是使用./zookeeper-client,然后 ls /consumers/\[group\_id]/offsets/\[topic]/\[broker\_id-part\_id],这个是查看某个group\_id的某个topic的offset

如果使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中

&#x20;对于**console生产者**，**--broker-list参数**指定了所使用的broker

&#x20;参考 [Kafka bootstrap-servers vs zookeeper in kafka-console-consumer](https://stackoverflow.com/questions/41774446/kafka-bootstrap-servers-vs-zookeeper-in-kafka-console-consumer)  中说建议使用新版(新版本指的是kafka 0.8.0之后的版本)的 --bootstrap-server

### **三，关于kafka和zookeeper**

**原因：**&#x4E3B;要有两个目的/动机：一是优化元数据管理，原来的zk方案，极端情况下可能会造成数据不一致；二是简化部署和配置

Kafka2.8版本开始，可以不用Apache Zookeeper来作为Kafka的依赖组件了，官网把这种称之为KRaft模式。目前，Kafka使用Zookeeper来存储有关分区和Broker的元数据，并选择一个Broker作为Kafka的Controller。现在官网打算删除对Zookeeper的依赖，让Kafka能够以更具扩展性和更加强大的方式管理元数据，从而支持更多分区。

![](/files/-MdVmPc9DLGnAsA73qwe)

KRaft目前在Kafka2.8版本是一个测试版本，KRaft模式不推荐使用到生产环境。当Kafka集群处理KRaft模式时，它不会将其元数据存储在Zookeeper中，实际上根本不需要运行Zookeeper，因为它将元数据存储在Controller节点中。

目前官网退出的KRaft模式仅用于测试，不推荐使用到生产环境，因为官方还不支持将现有的基于Zookeeper的Kafka集群升级到KRaft模式。实际上，当Kafka3.0发布时，无法将Kafka集群从2.8升级到3.0，目前该模式会有些BUG，如果尝试KRaft使用到生产环境，会存在数据丢失的风险。

可参考：<https://zhuanlan.zhihu.com/p/369398193>

以及<https://www.cnblogs.com/smartloli/p/14722529.html>

本文以Kafka 单节点单Broker部署及使用 做为演示

![](/files/-MdRFFJhiKHiYvqWZIXB)

## 一，下载及安装

需要先安装zookeeper

安装教程参考：1.<https://darren.gitbook.io/project/devops/zookeeper-an-zhuang-ji-shi-yong>

2.<https://www.cnblogs.com/wang-hongwei/p/14700178.html>

下载 <http://kafka.apache.org/downloads.html>

```
yum install java-1.8.0-openjdk  java-1.8.0-openjdk-devel
通过jps查看启动
```

步骤如下：

```
下载地址：https://kafka.apache.org/down...

wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz

解压
tar -zxvf kafka_2.11-1.0.0.tgz

cd /usr/local/kafka_2.11-1.0.0/
```

## 二，修改配置

修改server.properties（/usr/local/kafka/config/server.properties）

```
#broker的全局唯一编号，不能重复

broker.id=1

 

#用来监听链接的端口，producer或consumer将在此端口建立连接

listeners=PLAINTEXT:http://192.168.40.148:9092

 

#kafka消息存放的路径

log.dirs=/home/servers-kafka/logs/kafka

 

#broker需要使用zookeeper保存meta数据

zookeeper.connect=192.168.40.148:2181 


如果是生产使用，如下
############################# Server Basics #############################
# broker 的全局唯一编号，不能重复
broker.id=0

############################# Socket Server Settings #############################
# 配置监听,，默认
listeners=PLAINTEXT://:9092

# 用来监听连接的端口，producer和consumer将在此端口建立连接,，默认
port=9092

# 处理网络请求的线程数量，默认
num.network.threads=3

# 用来处理磁盘IO的线程数量，默认
num.io.threads=8

# 发送套接字的缓冲区大小，默认
socket.send.buffer.bytes=102400

# 接收套接字的缓冲区大小，默认
socket.receive.buffer.bytes=102400

# 请求套接字的缓冲区大小，默认
socket.request.max.bytes=104857600

############################# Log Basics #############################
# kafka 运行日志存放路径
log.dirs=/root/export/servers/logs/kafka

# topic 在当前broker上的分片个数，默认为1
num.partitions=2

# 用来恢复和清理data下数据的线程数量，默认
num.recovery.threads.per.data.dir=1

############################# Log Retention Policy #############################
# segment文件保留的最长时间，超时将被删除，默认
log.retention.hours=168

# 滚动生成新的segment文件的最大时间，默认
log.roll.hours=168
————————————————
版权声明：本文为CSDN博主「harveybd」的原创文章，遵循CC 4.0 BY-SA版权协议，转载请附上原文出处链接及本声明。
原文链接：https://blog.csdn.net/HG_Harvey/article/details/79174104

 
```

## 三，启动

```
#启动ZK
/app/kafka/kafka_2.12-2.1.0/bin/zookeeper-server-start.sh  /app/kafka/kafka_2.12-2.1.0/config/zookeeper.properties
或者单节点启动zk：bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#启动Kafka
#调试启动

/app/kafka/kafka_2.12-2.1.0/bin/kafka-server-start.sh /app/kafka/kafka_2.12-2.1.0/config/server.properties


开机启动
vim /etc/systemd/system/zookeeper.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target  zookeeper.service

[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/java/jdk-11.0.1/bin"
User=root
Group=root
ExecStart=/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh /opt/kafka/kafka_2.11-2.1.0/config/server.properties
ExecStop=/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

原文链接：https://blog.csdn.net/csdnlihai/article/details/87787236

systemctl daemon-reload
systemctl start kafka
systemctl status kafka
```

## 测试及使用

**创建 topic**

使用 `kafka-topics.sh` 创建单分区单副本的 topic test：

```
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
```

查看 topic 列表：

```
bin/kafka-topics.sh --list --zookeeper localhost:2181
```

## 查看分片情况

/app/kafka/kafka\_2.12-2.1.0/bin/kafka-topics.sh --describe --zookeeper 192.168.40.148:2181

对应topic存储的数据：

生产消费测试 Kafka系列：生产消费消息测试Demo ———————————————— 版权声明：本文为CSDN博主「NIO4444」的原创文章，遵循CC 4.0 BY-SA版权协议，转载请附上原文出处链接及本声明。

## [kafka常用管理命令](https://www.cnblogs.com/wangzhuxing/p/10127497.html)

### 一、Topic管理

#### 1、创建topic

```
kafka-topics.sh --zookeeper 47.52.199.52:2181 --create --topic test-15 --replication-factor 1 --partitions 3
```

#### 2、新增partition

```
 kafka-topics.sh --zookeeper zk.server --alter --topic test --replication-factor 1 --partitions 3
```

注：topic一旦创建，partition只能增加，不能减少

#### 3、删除topic

```
kafka-topics.sh --zookeeper zk.server --delete --topic test 
```

#### 4、查看topic列表

```
kafka-topics.sh --zookeeper zk.server --list
```

#### 5、查看topic详细信息

```
kafka-topics.sh --zookeeper zk.server --topic test --describe
```

Partition是分区编号，Leader，Replicats和Isr里是broker\_id，而broker\_id是在$KAFKA\_HOME/config/server.properties里配置的

#### 6、查看某个topic的message数量

```
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 47.52.199.51:9092 --topic consumer-send
```

[回到顶部](https://www.cnblogs.com/wangzhuxing/p/10127497.html#_labelTop)

### 二、consumer管理

#### 1、查看consumer Group列表

```
./bin/kafka-consumer-groups.sh  --list  --bootstrap-server 192.168.88.108:9092
```

#### 2、查看指定group.id的消费情况

```
./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test-1 --describe
```

结果：

```
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
consumer-send   0          5697            5697            0               consumer-3-77500c76-ef09-4047-9ae0-89c20cc379fc /183.17.238.48  consumer-3
producer-syn    0          4125            4125            0               -                                               -               -
```

CURRENT-OFFSET:当前消费偏移量

LOG-END-OFFSET：末尾偏移量

#### 3、删除group

```
./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test-1 --delete
```

#### 4、重置offset

**1、要求修改的group不能active,查看是否activ**

```
[root@izj6c46svwddzpu0evy0vbz kafka_2.11-2.0.1]# ./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --describe
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Consumer group 'test_4' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
consumer-send   0          5697            5697            0               -               -               -
producer-syn    0          4125            4125            0               -               -               
```

**2、重置命令**

```
./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --reset-offsets -to-offset 100 --topic consumer-send --execute
```

**3、导出offset**

```
./bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --reset-offsets -to-offset 100 --topic consumer-send --export > 1.txt
```

### 三、动态配置

#### 1、再平衡

```
./bin/kafka-preferred-replica-election.sh --zookeeper 47.52.199.51:2181/chroot
```

### 四、生产消费者

#### 1、启动kafka

```
nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
```

#### 2、创建消费者

```
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic middleware --from-beginning
```

#### 3、创建生产者

```
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
```

### 五，模拟测试

```
1.创建topic
bin/kafka-topics.sh --create --bootstrap-server 10.1.196.25:9092 --replication-factor 1 --partitions 2 --topic pordlog

2.创建groupid 不一定用上
bin/kafka-console-consumer.sh --bootstrap-server 10.1.196.25:9092 --topic pordlog --from-beginning --consumer-property group.id=es-prod

3.模拟测试
模拟生产者
bin/kafka-console-producer.sh --broker-list 10.1.196.25:9092 --topic pordlog
模拟消费者
bin/kafka-console-consumer.sh --bootstrap-server 10.1.196.25:9092 --topic pordlog --from-beginning --consumer-property group.id=es-prod

4.查看topic
bin/kafka-topics.sh --bootstrap-server 10.1.196.25:9092 --list

5.查看groupid
bin/kafka-consumer-groups.sh --bootstrap-server 10.1.196.25:9092 --list
```

## kafka-manager

kafka-manager，现在叫做CMAK，后来发现滴滴开源的kafka-manager，功能更丰富。看了下它的架构图和源码，还是很值得参考和借鉴的。

让我们来看看didi kafka-manager官网介绍的功能：

**一站式 `ApacheKafka`集群指标监控与运维管控平台**

**didi参考文档** [**https://github.com/didi/Logi-KafkaManager/blob/master/docs/install\_guide/install\_guide\_cn.md**](https://github.com/didi/Logi-KafkaManager/blob/master/docs/install_guide/install_guide_cn.md)

CMAK 参考文档 <https://www.cnblogs.com/frankdeng/p/9584870.html>

&#x20;kafka-manager 项目地址：<https://github.com/yahoo/kafka-manager>

## 参考文章

&#x20;原文链接：<https://blog.csdn.net/VIP099/article/details/105647966>

更多安装文章;<https://segmentfault.com/a/1190000012730949>

配置文件讲解：<https://www.cnblogs.com/sandea/p/12078442.html>

思路讲解：<https://juejin.cn/post/6844903847844904973>

基于Kubernetes部署Kafka集群 <https://blog.csdn.net/l1028386804/article/details/106773932>

阿里云官方安装k8s-kafka <https://blog.51cto.com/u_14783669/2666234>

&#x20;**kafkamanager管理工具** [**https://www.cnblogs.com/kevingrace/p/14412024.html**](https://www.cnblogs.com/kevingrace/p/14412024.html)

**kafka** [**https://javinjunfeng.gitbooks.io/kafka/content/chu-ji/kafkadan-jie-dian-zhi-ji-qun-de-an-zhuang-bu-shu-ji-zhu-yi-shi-xiang.html**](https://javinjunfeng.gitbooks.io/kafka/content/chu-ji/kafkadan-jie-dian-zhi-ji-qun-de-an-zhuang-bu-shu-ji-zhu-yi-shi-xiang.html)

**调优：**[**https://segmentfault.com/a/1190000022763403**](https://segmentfault.com/a/1190000022763403)

[kafka 2.8 版本部署](https://www.cnblogs.com/wang-hongwei/p/14700178.html)


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://darren.gitbook.io/project/k8s-xi-tong-wan-zheng-bu-shu/efk/ri-zhi-kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
