前言

环境介绍

虚拟机软件:VirtualBox

Linux 发行版本:Ubuntu 20.04.4

虚拟机核心数:1 core

虚拟机内存:2 GB

JDK 版本:1.8.0_202

ZK 版本:3.8.0

Kafka 版本:3.2.0

Kafka - ZK 模式

Kafka 2.8.0 之前,所有元数据信息都存储在 ZK。

ZK 成为 Kafka 瓶颈。从 2.8.0 开始,可以将元数据信息存储在 Kafka,脱离 ZK。

集群规划

node01node02node03
zkzkzk
kafkakafkakafka

ZK 集群部署

可以参考 《Hadoop HA 搭建》 中的 ZK 集群搭建

Kafka 集群部署

Kafka 环境变量

1
2
3
4
5
6
7
$ vim /etc/profile
# 尾部添加以下内容
export KAFKA_HOME=/opt/kafka-3.2.0
export PATH=$PATH:$KAFKA_HOME/bin

$ xsync $KAFKA_HOME
$ xsync /etc/profile

配置 server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
$ vim $KAFKA_HOME/config/server.properties

############################# Server Basics #############################
# broker 全局唯一编号,不能重复
broker.id=0

############################# Socket Server Settings #############################
# 网络请求线程数量
num.network.threads=3

# IO 线程数量
num.io.threads=8

# 发送套接字缓冲区大小
socket.send.buffer.bytes=102400

# 接受套接字缓冲区大小
socket.receive.buffer.bytes=102400

# 请求套接字缓冲区大小
socket.request.max.bytes=104857600


############################# Log Basics #############################
# kafka 日志路径
log.dirs=/tmp/kafka-logs

# 该 broker 上分区数量
num.partitions=1

# 恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# topic 副本个数
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Retention Policy #############################
# 文件保留最长时间
log.retention.hours=168

# 单文件大小上限
log.segment.bytes=1073741824

# 检查过期数据间隔
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# 配置 zk,及元数据存储位置
zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka

# 连接 ZK 超时时间
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0


$ xsync $KAFKA_HOME/config/server.properties

注意:每个机子上的,broker.id 要不一样,node01 ~ node03 依次为 1、2、3

Kafka 集群启动

1
2
3
4
# 启动
$ xcall kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# 关闭
$ xcall kafka-server-stop.sh

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

检验

1
2
3
4
5
6
# 创建 topic
$ kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --create --partitions 3 --replication-factor 3 --topic hello
# 创建生产者
$ kafka-console-producer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic hello
# 创建消费者
$ kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic hello

生产者发送消息后,消费者收到则说明部署成功

Kafka - Kraft 模式

Kafka 2.8.0 开始可以摆脱 ZK,这样做的好处有以下几个:

  • Kafka 不再依赖外部框架,而是能够独立运行
  • controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升
  • 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制
  • controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强。controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策

集群规划

node01node02node03
kafka-controllerkafka-controllerkafka-controller
kafka-brokerkafka-brokerkafka-broker

Kafka 集群部署

Kafka 环境变量

与上文一致

配置 server.properties

注意:本节的 server.properties 与上文不同,这里的是在 kraft 目录下的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
$ vim $KAFKA_HOME/config/kraft/server.properties

############################# Server Basics #############################

# 设置当前节点的角色
process.roles=broker,controller

# 全局唯一编号,不能重复。分发后记得修改
node.id=1

# controller 列表,代替之前的 ZK 列表
controller.quorum.voters=1@node01:9093,2@node02:9093,3@node03:9093

############################# Socket Server Settings #############################

# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# broker 对外暴露的地址,分发后记得修改
advertised.listeners=PLAINTEXT://node01:9092

# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# kafka 日志路径
log.dirs=/tmp/kraft-combined-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1


############################# Log Retention Policy #############################

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000


$ xsync $KAFKA_HOME/config/kraft/server.properties

注意

  1. 每个机子上的 broker.id 要不一样,node01 ~ node03 依次为 1、2、3

  2. 每个机子上的 advertised.listeners 要不一样,node01 ~ node03 依次为 PLAINTEXT://node01:9092PLAINTEXT://node02:9092PLAINTEXT://node03:9092

初始化 Kafka

1
2
3
4
5
# 生成存储目录唯一 ID
$ kafka-storage.sh random-uuid
YE6iIE-zT4m45ZdkY3nz1A
# 根据上面生成的 ID,对所有机子初始化
$ xcall kafka-storage.sh format -t YE6iIE-zT4m45ZdkY3nz1A -c $KAFKA_HOME/config/kraft/server.properties

Kafka 集群启动

1
$ xcall kafka-server-start.sh -daemon $KAFKA_HOME/config/kraft/server.properties

检验

与上文一致

Kafka - Eagle 监控

随便找一台机子部署 Kafka - Eagle,Kafka - Eagle 需要数据库,这里选择 MySQL。

本文将 Eagle 和 MySQL 都部署在 node01 上。

Eagle 版本:2.1.0

MySQL 版本:8.0.28

MySQL 部署

可以参考 《基于 Hadoop HA 的 Hive 搭建》 中的 MySQL 部署

Kafka 准备

修改 kafka-server-start.sh

1
$ vim $KAFKA_HOME/bin/kafka-server-start.sh

将以下内容(在 28 行)

1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

修改为

1
2
3
4
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
fi

分发到其他节点

1
$ xsync $KAFKA_HOME/bin/kafka-server-start.sh

重启 Kafka

1
2
$ xcall kafka-server-stop.sh
$ xcall kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

KE 部署

KE 环境变量

1
2
3
4
5
$ vim /etc/profile
# 尾部添加以下内容
export KE_HOME=/opt/efak-web-2.1.0
export PATH=$PATH:$KE_HOME/bin
$ source /etc/profile

配置 system-config.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
$ vim $KE_HOME/conf/system-config.properties

######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
# 配置 ZK
######################################
efak.zk.cluster.alias=cluster1
cluster1.zk.list=node01:2181,node02:2181,node03:2181/kafka

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.efak.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=16

######################################
# EFAK webui port
######################################
efak.webui.port=8048

######################################
# EFAK enable distributed
######################################
efak.distributed.enable=false
efak.cluster.mode.status=master
efak.worknode.master.host=localhost
efak.worknode.port=8085

######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456

######################################
# kafka offset storage
# kafka offset 保存位置
# 0.9 之前在 ZK,0.9 之后在 Kafka
# 根据 Kafka 版本选择,上面部署的 Kafka 是 3.2.0,所以存储在 Kafka
######################################
cluster1.efak.offset.storage=kafka
# cluster1.efak.offset.storage=kafka

######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15

######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10

######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=

######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=

######################################
# kafka mysql jdbc driver address
# 配置 JDBC 连接,上面安装的是 MySQL
######################################
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456

KE 启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 启动
$ ke.sh start # 出现以下内容表示启动成功

[2022-05-21 22:22:21] INFO: [Job done!]
Welcome to
______ ______ ___ __ __
/ ____/ / ____/ / | / //_/
/ __/ / /_ / /| | / ,<
/ /___ / __/ / ___ | / /| |
/_____/ /_/ /_/ |_|/_/ |_|
( Eagle For Apache Kafka® )

Version 2.1.0 -- Copyright 2016-2022
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://192.168.128.101:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************

# 关闭
$ ke.sh stop