Kafka概述

定义

Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。

发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event StreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

消息队列

目前企业中比较常见的消息队列产品主要有 Kafka、ActiveMQ 、RabbitMQ 、RocketMQ 等。

在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。

传统消息队列的应用场景

传统的消息队列的主要应用场景包括:缓存消峰解耦异步通信。

消息队列的两种模式

消息队列的两种模式

区别: 点对点消费 -> 消息只能发布到一个主题, 消费完成就删除消息,且只有一个消费者

发布订阅模式 -> 消息可以发布到多个主题, 消息一般保留七天,且有多个消费者

基础架构

在Kafka2.8版本前,Zookeeper的Consumer文件中存放消息被消费的记录(offset)

在Kafka2.8版本走,消息被消费的记录(offset)存放在Kafka中。

(1)Producer:消息生产者,就是向 Kafka broker 发消息的客户端。

(2)Consumer:消息消费者,向 Kafka broker 取消息的客户端。

(3)Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

(4)Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个 topic。

(5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。

(6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。

(7)Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。

(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。

(9)Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

Kafka 快速入门

安装部署

集群规划

2.1.2 集群部署

0)官方下载地址:http://kafka.apache.org/downloads.html

1)解压安装包

1
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

2)修改解压后的文件名称

1
[atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0/ kafka

3)进入到/opt/module/kafka 目录,修改配置文件

1
2
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties

输入以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824

# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

4)分发安装包

1
[atguigu@hadoop102 module]$ xsync kafka/

5)分别在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2
注:broker.id 不得重复,整个集群中唯一。

1
2
3
4
[atguigu@hadoop103 module]$ vim kafka/config/server.properties
修改:
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
1
2
3
4
[atguigu@hadoop104 module]$ vim kafka/config/server.properties
修改:
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

6)配置环境变量
(1)在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置

1
2
3
4
5
sudo vim /etc/profile.d/my_env.sh

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

刷新环境变量

1
source /etc/profile

启动

(1) Zookeeper启动 (默认守护进程)

1
./zkServer.sh start

Zookeeper状态

1
./zkServer.sh status

Zookeeper停止

1
2
3
4
5
./zkServer.sh stop

Zookeeper客户端-常⽤命令
./zkCli.sh –server ip:port //连接ZooKeeper服务端
quit //断开连接

(2) 启动Kafka

Kafka 守护方式 (环境变量配置前提下)

1
kafka-server-start.sh -daemon /home/environment/kafka/config/server.properties

Kafka关闭

1
kafka-server-stop.sh 

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

Kafka命令行操作

基础结构

主题命令行操作

1)查看操作主题命令参数

1
bin/kafka-topics.sh

2)查看当前服务器中的所有 topic

1
kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --list

3)创建 first topic

1
kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --create --partitions 1 --replication-factor 3 --topic first

选项说明:
--topic 定义 topic 名
--replication-factor 定义副本数
--partitions 定义分区数

4)查看 first 主题的详情

1
kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --alter --topic first --partitions 3

5)修改分区数(注意:分区数只能增加,不能减少)

1
kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --alter --topic first --partitions 3

6)再次查看 first 主题的详情

1
kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --describe --topic first

7)删除 topic(需要配置信息)

1
kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --delete --topic first

生产者命令行操作

1)查看操作生产者命令参数

连接kafka生产者

1
kafka-console-producer.sh --bootstrap-server 47.106.86.64:9092 --topic first

参数 描述
--bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
--topic <String: topic> 操作的 topic 名称。

2)发送消息

1
2
3
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh -- bootstrap-server hadoop102:9092 --topic first
>hello world
>Hi HI

消费者命令行操作

1)查看操作消费者命令参数

连接kafka消费者

1
kafka-console-consumer.sh

参数 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic> 操作的 topic 名称。
–from-beginning 从头开始消费。
–group <String: consumer group id> 指定消费者组名称。

2)消费消息
(1)消费 first 主题中的数据。

1
kafka-console-consumer.sh --bootstrap-server 47.106.86.64:9092 --topic first

(2)把主题中所有的数据都读取出来(包括历史数据)。

1
kafka-console-consumer.sh --bootstrap-server 47.106.86.64:9092 --from-beginning --topic first

Kafka 生产者

生产者消息发送流程

发送原理

在消息发送的过程中,涉及到两个线程,main线程sender线程,其中main线程是消息的生产线程,而sender线程是jvm单例的线程,专门用于消息的发送。

在jvm的内存中开辟了一块缓存空间叫RecordAccumulator(消息累加器),用于将多条消息合并成一个批次,然后由sender线程发送给kafka集群。

我们的一条消息在生产过程会调用send方法然后经过拦截器经过序列化器,再经过分区器确定消息发送在具体topic下的哪个分区,然后发送到对应的消息累加器中,消息累加器是多个双端队列。并且每个队列和主题分区都具有一一映射关系。消息在累加器中,进行合并,达到了对应的size(batch.size)或者等待超过对应的等待时间(linger.ms),都会触发sender线程的发送。sender线程有一个请求池,默认缓存五个请求( max.in.flight.requests.per.connection ),发送消息后,会等待服务端的ack,如果没收到ack就会重试默认重试int最大值( retries )。如果ack成功就会删除累加器中的消息批次,并相应到生产端。

当双端队列中的DQueue满足 batch.size 或者 linger.ms 条件时触发sender线程。

生产者重要参数列表

发送

普通异步发送

1)需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker

2)代码编写
(1)创建工程 kafka
(2)导入依赖

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>

(4)编写不带回调函数的 API 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CustomProducer {

public static void main(String[] args) {

// 1. 给 kafka 配置对象添加配置信息:bootstrap.servers
Properties properties = new Properties();
//服务信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"47.106.86.64:9092");
//配置序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 2. 创建 kafka 生产者的配置对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String,String>(properties);

// 3. 创建 kafka 生产者对象
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord("first", "one" + i));
}
kafkaProducer.close();
}
}

测试:在Linux上开启Kafka验证

1
kafka-console-consumer.sh --bootstrap-server 47.106.86.64:9092 --topic first

带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(Record Metadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CustomProducer {

public static void main(String[] args) {

// 1. 给 kafka 配置对象添加配置信息:bootstrap.servers
Properties properties = new Properties();
//服务信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"47.106.86.64:9092");
//配置序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 2. 创建 kafka 生产者的配置对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String,String>(properties);

// 3. 创建 kafka 生产者对象
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord("first", "one" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println( "分区 : " + recordMetadata.partition() + " 主题: " + recordMetadata.topic() );
}
}
});
}
kafkaProducer.close();
}
}

同步发送API

  1. 先处理已经堆积在DQueue中的数据。
  2. RecordAccumulator再处理外部数据。

只需在异步发送的基础上,再调用一下 get()方法即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CustomProducerSync {
public static void main(String[] args) throws ExecutionException, InterruptedException {

// 1. 给 kafka 配置对象添加配置信息:bootstrap.servers
Properties properties = new Properties();
//服务信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"47.106.86.64:9092");
//配置序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 2. 创建 kafka 生产者的配置对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String,String>(properties);

// 3. 创建 kafka 生产者对象
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord("first", "one" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println( "分区 : " + recordMetadata.partition() + " 主题: " + recordMetadata.topic() );
}

}
}).get();
Thread.sleep(100);
}
kafkaProducer.close();
}
}

生产者拦截器

生产者拦截器 (ProducerInterceptor)

拦截器接口一共有三个方法。三个方法内的实现如果抛出异常,会被ProducerInterceptors内部捕获,并不会抛到上层。

1
2
3
4
5
public interface ProducerInterceptor<K, V> extends Configurable {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
void onAcknowledgement(RecordMetadata metadata, Exception exception);
void close();
}

onSend 方法在消息分区之前,可以对消息进行一定的修改,比如给key添加前缀,甚至可以修改我们的topic,如果需要使用kafka实现延时队列高级应用,我们就可以通过拦截器对消息进行判断,并修改,暂时放入我们的延时主题中,等时间达到再放回普通主题队列。

onAcknowledgement该方法是在我们服务端对sender线程进行消息确认,或消息发送失败后的一个回调。优先于我们send方法的callback回调。我们可以对发送情况做一个统计。但是该方法在我们的sender线程也就是唯一的IO线程执行,逻辑越少越好。

close该方法可以在关闭拦截器时,进行一些资源的释放。

(1) 实现自定义拦截器

1
2
3
4
5
public MyInterceptor implements ProducerInterceptor {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
void onAcknowledgement(RecordMetadata metadata, Exception exception);
void close();
}

(2)将自定义拦截器加入设置中

1
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyInterceptor.getClass.getName());

生产者分区

分区的好处

从存储的角度 -> 合理使用存储资源,实现负载均衡

从计算的角度 -> 提高并行计算的可行性

生产者发送消息分区策略

1)默认的分区器 DefaultPartitioner
在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。

Kafka支持三种分区策略 1) 指定分区; 2)指定key,计算hash得分区; 3)指定随机粘性分区;

自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器。

1)需求
例如我们实现一个分区器实现,发送过来的数据中如果包含 Hi,就发往 0 号分区,不包含 Hi,就发往 1 号分区。
2)实现步骤
(1)定义类实现 Partitioner 接口。
(2)重写 partition()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MyPartitioner implements Partitioner {
/**
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String string = value.toString();
if (string.contains("vi")){
return 2;
}else{
return 1;
}
}
}

(3)使用分区器的方法,在生产者的配置中添加分区器参数。

1
2
//自定义分区规则 
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());

(4)开启测试

生产者提高吞吐量

通过提高吞吐量达到低延迟的效果

Batch.size 与 linger.ms 配合使用,根据生成数据的大小指定。

RecordAccumlator:在异步发送并且分区很多的情况下,32M的数据量容易被满足,进程交互加大,可以适当提高到64M。

1
2
3
4
5
6
7
8
9
 // batch.size:批次大小,默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

消息累加器

消息累加器(RecordAccumulator)

为了提高生产者的吞吐量,我们通过累加器将多条消息合并成一批统一发送。在broker中将消息批量存入。减少多次的网络IO。

消息累加器默认32m,如果生产者的发送速率大于sender发送的速率,消息就会堆满累加器。生产者就会阻塞,或者报错,报错取决于阻塞时间的配置。

累加器的存储形式为ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,可以看出来就是一个分区对应一个双端队列,队列中存储的是ProducerBatch一般大小是16k根据batch.size配置,新的消息会append到ProducerBatch中,满16k就会创建新的ProducerBatch,并且触发sender线程进行发送。

如果消息量非常大,生成了大量的ProducerBatch,在发送后,又需要JVM通过GC回收这些ProducerBatch就变得非常影响性能,所以kafka通过 BufferPool作为内存池来管理ProducerBatch的创建和回收,需要申请一个新的ProducerBatch空间时,调用 free.allocate(size, maxTimeToBlock)找内存池申请空间。

如果单条消息大于16k,那么就不会复用内存池了,会生成一个更大的ProducerBatch专门存放大消息,发送完后GC回收该内存空间。

1
为了进一步减小网络中消息传输的带宽。我们也可以通过**消息压缩**的方式,在生产端将消息追加进`ProducerBatch`就对每一条消息进行压缩了。常用的有Gzip、Snappy、Lz4 和 Zstd,这是时间换空间的手段。压缩的消息会在消费端进行解压。

消息发送线程(Sender)

消息保存在内存后,Sender线程就会把符合条件的消息按照批次进行发送。除了发送消息,元数据的加载也是通过Sender线程来处理的。

Sender线程发送消息以及接收消息,都是基于java NIO的Selector。通过Selector把消息发出去,并通过Selector接收消息。

Sender线程默认容纳5个未确认的消息,消息发送失败后会进行重试。

生产经验—数据可靠性

消息确认机制-ACK

producer提供了三种消息确认的模式,通过配置acks来实现

acks为0时, 表示生产者将数据发送出去就不管了,不等待任何返回。这种情况下数据传输效率最高,但是数据可靠性最低,当 server挂掉的时候就会丢数据;

acks为1时(默认),表示数据发送到Kafka后,经过leader成功接收消息的的确认,才算发送成功,如果leader宕机了,就会丢失数据。

acks为-1/all时,表示生产者需要等待ISR中的所有follower都确认接收到数据后才算发送完成,这样数据不会丢失,因此可靠性最高,性能最低。

  • 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

AR = ISR + ORS

正常情况下,如果所有的follower副本都应该与leader副本保持一定程度的同步,则AR = ISR,OSR = null。

ISR 表示在指定时间内和leader保存数据同步的集合;

ORS表示不能在指定的时间内和leader保持数据同步集合,称为OSR(Out-Sync Relipca set)。

1
2
3
4
5
// Ack 设置,默认是1
properties.put(ProducerConfig.ACKS_CONFIG,"1");

// 重试次数, 默认的重试次数是 Max.Integer
properties.put(ProducerConfig.RETRIES_CONFIG,3);

数据去重-幂等性

1)幂等性原理

在一般的MQ模型中,常有以下的消息通信概念

  • 至少一次(At Least Once): ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量>=2。可以保证数据不丢失,但是不能保证数据不重复。
  • 最多一次(At Most Once):ACK级别设置为0 。可以保证数据不重复,但是不能保证数据不丢失。•
  • 精确一次(Exactly Once):至少一次 + 幂等性 。 Kafka 0.11版本引入一项重大特性:幂等性和事务

幂等性,简单地说就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用Kafka 的幂等性功能之后就可以避免这种情况。(不产生重复数据)

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其

中ProducerId(pid)是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number 序列化号,是单调自增的。

broker中会在内存维护一个pid+分区对应的序列号。如果收到的序列号正好比内存序列号大一,才存储消息,如果小于内存序列号,意味着消息重复,那么会丢弃消息,并应答。如果远大于内存序列号,意味着消息丢失,会抛出异常。

所以幂等解决的是sender到broker间,由于网络波动可能造成的重发问题。用幂等来标识唯一消息。

并且幂等性只能保证的是在单分区单会话内不重复。

2)如何使用幂等性

开启幂等性功能的方式很简单,只需要显式地将生产者客户端参数enable.idempotence设置为true即可(这个参数的默认值为true),并且还需要确保生产者客户端的retries、acks、max.in.filght.request.per.connection参数不被配置错,默认值就是对的。

消息事务

由于幂等性不能跨分区运作,为了保证同时发的多条消息,要么全成功,要么全失败。kafka引入了事务的概念。

开启事务需要producer设置transactional.id的值并同时开启幂等性。

通过事务协调器,来实现事务,工作流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

消息顺序

消息在单分区内有序,多分区内无序(如果对多分区进行排序,造成分区无法工作需要等待排序,浪费性能)

kafka只能保证单分区下的消息顺序性,为了保证消息的顺序性,需要做到如下几点。

如果未开启幂等性,需要 max.in.flight.requests.per.connection 设置为1。(缓冲队列最多放置1个请求)

如果开启幂等性,需要 max.in.flight.requests.per.connection 设置为小于5。

这是因为broker端会缓存producer主题分区下的五个request,保证最近5个request是有序的。

如果Request3在失败重试后才发往到集群中,必然会导致乱序,但是集群会重新按照序列号进行排序(最对一次排序5个)。

Kafka Broker

Broker设计

我们都知道kafka能堆积非常大的数据,一台服务器,肯定是放不下的。由此出现的集群的概念,集群不仅可以让消息负载均衡,还能提高消息存取的吞吐量。kafka集群中,会有多台broker,每台broker分别在不同的机器上。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TKCzpF2O-1662356150180)(http://mk-images.tagao.top/img/640?imageslim)]

为了提高吞吐量,每个topic也会都多个分区,同时为了保持可靠性,每个分区还会有多个副本。这些分区副本被均匀的散落在每个broker上,其中每个分区副本中有一个副本为leader,其他的为follower。

Zookeeper

Zookeeper作用

Zookeeper在Kafka中扮演了重要的角色,kafka使用zookeeper进行元数据管理,保存broker注册信息,包括主题(Topic)、分区(Partition)信息等,选择分区leader。

Broker选举Leader

这里需要先明确一个概念leader选举,因为kafka中涉及多处选举机制,容易搞混,Kafka由三个方面会涉及到选举:

  • broker(控制器)选leader
  • 分区多副本选leader
  • 消费者选Leader

在kafka集群中由很多的broker(也叫做控制器),但是他们之间需要选举出一个leader,其他的都是follower。broker的leader有很重要的作用,诸如:创建、删除主题、增加分区并分配leader分区;集群broker管理,包括新增、关闭和故障处理;分区重分配(auto.leader.rebalance.enable=true,后面会介绍),分区leader选举。

每个broker都有唯一的brokerId,他们在启动后会去竞争注册zookeeper上的Controller结点,谁先抢到,谁就是broker leader。而其他broker会监听该结点事件,以便后续leader下线后触发重新选举。

简图:

  • broker(控制器)选leader

详细图:

  • broker(控制器)选leader
  • 分区多副本选leader

模拟 Kafka 上下线,Zookeeper 中数据变化

(1)查看/kafka/brokers/ids 路径上的节点。

1
2
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[0, 1, 2]

(2)查看/kafka/controller 路径上的数据。

1
2
[zk: localhost:2181(CONNECTED) 15] get /kafka/controller
{"version":1,"brokerid":0,"timestamp":"1637292471777"}

(3)查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。

1
2
[zk: localhost:2181(CONNECTED) 16] get  /kafka/brokers/topics/first/partitions/0/state
{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18," isr":[0,1,2]}

(4)停止 hadoop104 上的 kafka。

1
kafka-server-stop.sh

(5)再次查看/kafka/brokers/ids 路径上的节点。

1
2
[zk: localhost:2181(CONNECTED) 3] ls /kafka/brokers/ids
[0, 1]

(6)再次查看/kafka/controller 路径上的数据。

1
2
[zk: localhost:2181(CONNECTED) 15] get /kafka/controller
{"version":1,"brokerid":0,"timestamp":"1637292471777"}

(7)再次查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。

1
2
[zk: localhost:2181(CONNECTED) 16] get  /kafka/brokers/topics/first/partitions/0/state
{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18," isr":[0,1]}

(8)启动 hadoop104 上的 kafka。

1
kafka-server-start.sh -daemon ./config/server.properties

(9)再次观察(1)、(2)、(3)步骤中的内容。

Broker重要参数

节点服役和退役

服役新节点

(1) 启动一台新的KafKa服务端(加入原有的Zookeeper集群)

(2) 查看原有的 分区信息 describe

1
2
3
4
5
6
$ kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --topic first --describe

Topic: first TopicId: 4DtkHPe4R1KyXNF7QyVqBA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0
Topic: first Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1
Topic: first Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,0

(3) 指定需要均衡的主题

1
2
3
4
5
6
7
8
$ vim topics-to-move.json

{
"topics": [
{"topic": "first"}
],
"version": 1
}

(4) 生成负载均衡计划(只是生成计划)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bin/kafka-reassign-partitions.sh --bootstrap-server 47.106.86.64:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replic
as":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","par
tition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"to
pic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","
any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replic
as":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","par
tition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"to
pic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","
any","any"]}]}

(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。

1
2
3
4
5
6
7
vim increase-replication-factor.json

{"version":1,"partitions":[{"topic":"first","partition":0,"replic
as":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","par
tition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"to
pic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","
any","any"]}]}

(5) 执行副本计划

1
kafka-reassign-partitions.sh --bootstrap-server 47.106.86.64:9092 --reassignment-json-file increase-replication-factor.json --execute

(6) 验证计划

1
2
3
4
5
6
7
8
kafka-reassign-partitions.sh --bootstrap-server 47.106.86.64:9092 --reassignment-json-file increase-replication-factor.json --verify

Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.
Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first

退役旧节点

1)执行负载均衡操作
先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。

不同于服役计划的 --broker-list "0,1,2" 退役了 Broker3 ;

1
kafka-reassign-partitions.sh --bootstrap-server 47.106.86.64:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate

副本机制

副本基本信息

  • Replica :副本,同一分区的不同副本保存的是相同的消息,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失 ,提高副本可靠性,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
  • Leader :每个分区的多个副本中的"主副本",生产者以及消费者只与 Leader 交互
  • Follower :每个分区的多个副本中的"从副本",负责实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,从 Follower 副本中重新选举新的 Leader 副本对外提供服务。
  • AR:分区中的所有 Replica 统称为 AR = ISR +OSR
  • ISR:所有与 Leader 副本保持一定程度同步的Replica(包括 Leader 副本在内)组成 ISR
  • OSR:与 Leader 副本同步滞后过多的 Replica 组成了 OSR
  • LEO:每个副本都有内部的LEO,代表当前队列消息的最后一条偏移量offset + 1。
  • HW:高水位,代表所有ISR中的LEO最低的那个offset,也是消费者可见的最大消息offset。

副本选举Leader

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader (4.2.2) ,负责管理集群Broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。

Broker中Controller 的信息同步工作是依赖于 Zookeeper 的 ./broker/topic 目录下的信息。

结论先行: 如果leader副本下线, 会在ISR队列中存活为前提,按照Replicas队列中前面优先的原则。

↓↓↓

(1)创建一个新的 topic,4 个分区,4 个副本

1
kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4

(2)查看 Leader 分布情况

1
2
3
4
5
6
7
8
kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --describe --topic atguigu1

Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3

(3)停止掉 hadoop105 的 kafka 进程,并查看 Leader 分区情况

1
2
3
4
5
6
7
8
9
10
kafka-server-stop.sh

kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --describe --topic atguigu1

Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0

(4)停止掉 hadoop104 的 kafka 进程,并查看 Leader 分区情况

1
2
3
4
5
6
7
8
9
kafka-server-stop.sh
kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --describe --topic atguigu1

Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0

副本故障处理

follower故障流程

如果follower落后leader过多,体现在落后时间 repca.lag.time.max.ms ,或者落后偏移量repca.lag.max.messages(由于kafka生成速度不好界定,后面取消了该参数),follower就会被移除ISR队列,等待该队列LEO追上HW,才会重新加入ISR中。

leader故障流程

旧Leader先被从ISR队列中踢出,然后从ISR中选出一个新的Leader来;此时为了保证多个副本之间的数据一致性,其他的follower会先将各自的log文件中高于HW的部分截取掉,然后从新的leader同步数据(由此可知这只能保证副本之间数据一致性,并不能保证数据不丢失或者不重复)。体现了设置ACK-all的重要性。

分区副本分配

如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka底层如何分配存储副本呢?
1)创建 16 分区,3 个副本
(1)创建一个新的 topic,名称为 second。

1
kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --create --partitions 16 --replication-factor 3 --topic second

(2)查看分区和副本情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kafka-topics.sh --bootstrap-server 47.106.86.64:9092  --describe --topic second

Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

手动调整分区副本

手动调整分区副本存储的步骤如下:
(1)创建一个新的 topic,名称为 three。

1
kafka-topics.sh --bootstrap-server  47.106.86.64:9092  --create --partitions 4 --replication-factor 2 --topic three

(3)创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。

1
2
3
4
5
6
7
8
9
10
$ vim increase-replication-factor.json
输入如下内容:
{
"version":1,
"partitions":[
{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}

(4)执行副本存储计划。

1
kafka-reassign-partitions.sh --bootstrap-server  47.106.86.64:9092  --reassignment-json-file increase-replication-factor.json --execute

(5)验证副本存储计划。

1
kafka-reassign-partitions.sh --bootstrap-server  47.106.86.64:9092  --reassignment-json-file increase-replication-factor.json --verify

分区自动调整

一般情况下,我们的分区都是平衡散落在broker的,随着一些broker故障,会慢慢出现leader集中在某台broker上的情况,造成集群负载不均衡,这时候就需要分区平衡。

为了解决上述问题kafka出现了自动平衡的机制。kafka提供了下面几个参数进行控制:

  • auto.leader.rebalance.enable:自动leader parition平衡,默认是true;
  • leader.imbalance.per.broker.percentage:每个broker允许的不平衡的leader的比率,默认是10%,如果超过这个值,控制器将会触发leader的平衡
  • leader.imbalance.check.interval.seconds:检查leader负载是否平衡的时间间隔,默认是300秒
  • 但是在生产环境中是不开启这个自动平衡,因为触发leader partition的自动平衡会损耗性能,或者可以将触发自动平和的参数leader.imbalance.per.broker.percentage的值调大点。

我们也可以通过修改配置,然后手动触发分区的再平衡。

增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。
不能通过命令行的方法添加副本。
1)创建 topic

1
bin/kafka-topics.sh --bootstrap-server 47.106.86.64:9092 --create --partitions 3 --replication-factor 1 --topic four

2)手动增加副本存储
(1)创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)。

1
2
3
4
5
6
vim increase-replication-factor.json

{"version":1,"partitions":[
{"topic":"four","partition":0,"replicas":[0,1,2]},
{"topic":"four","partition":1,"replicas":[0,1,2]},
{"topic":"four","partition":2,"replicas":[0,1,2]}]}

(2)执行副本存储计划。

1
kafka-reassign-partitions.sh --bootstrap-server 47.106.86.64:9092 --reassignment-json-file increase-replication-factor.json --execute

文件存储

存储结构

在Kafka中主题(Topic)是一个逻辑上的概念,分区(partition)是物理上的存在的。每个partition对应一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端。为防止log文件过大导致数据定位效率低下,Kafka采用了分片和索引机制,将每个partition分为多个segment,每个segment默认1G( log.segment.bytes ), 每个segment包括.index文件、**.log文件和.timeindex**等文件。这些文件位于文件夹下,该文件命名规则为:topic名称+分区号。

Segment的三个文件需要通过特定工具打开才能看到信息

1
2
3
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index

kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log

当log文件写入4k(这里可以通过log.index.interval.bytes设置)数据,就会写入一条索引信息到index文件中,这样的index索引文件就是一个稀疏索引,它并不会每条日志都建立索引信息。

当Kafka查询一条offset对应实际消息时,可以通过index进行二分查找,获取最近的低位offset,然后从低位offset对应的position开始,从实际的log文件中开始往后查找对应的消息。

时间戳索引文件,它的作用是可以查询某一个时间段内的消息,它的数据结构是:时间戳(8byte)+ 相对offset(4byte),如果要使用这个索引文件,先要通过时间范围找到对应的offset,然后再去找对应的index文件找到position信息,最后在遍历log文件,这个过程也是需要用到index索引文件的。

****文件清理策略

Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka 中每一个分区副本都对应一个Log,而Log又可以分为多个日志分段,这样也便于日志的清理操作。Kafka提供了两种日志清理策略。

  1. 日志删除(delete) :按照一定的保留策略直接删除不符合条件的日志分段。
  2. 日志压缩(compact) :针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。

我们可以通过修改broker端参数 log.cleanup.policy 来进行配置

日志删除

kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。

  • log.retention.hours:最低优先级小时,默认7天
  • log.retention.minutes:分钟
  • log.retention.ms:最高优先级毫秒
  • log.retention.check.interval.ms:负责设置检查周期,默认5分钟
  • file.delete.delay.ms:延迟执行删除时间
  • log.retention.bytes:当设置为-1时表示运行保留日志最大值(相当于关闭);当设置为1G时,表示日志文件最大值

具体的保留日志策略有三种:

基于时间策略

日志删除任务会周期检查当前日志文件中是否有保留时间超过设定的阈值来寻找可删除的日志段文件集合;这里需要注意log.retention参数的优先级:log.retention.ms > log.retention.minutes > log.retention.hours,默认只会配置log.retention.hours参数,值为168即为7天。

删除过期的日志段文件,并不是简单的根据日志段文件的修改时间计算,而是要根据该日志段中最大的时间戳来计算的,首先要查询该日志分段所对应的时间戳索引文件,查找该时间戳索引文件的最后一条索引数据,如果时间戳大于0就取值,否则才会使用最近修改时间。

在删除的时候先从Log对象所维护的日志段的跳跃表中移除要删除的日志段,用来确保已经没有线程来读取这些日志段;接着将日志段所对应的所有文件,包括索引文件都添加上**.deleted的后缀;最后交给一个以delete-file命名的延迟任务来删除这些以.deleted为后缀的文件,默认是1分钟执行一次,可以通过file.delete.delay.ms**来配置。

基于日志大小策略

日志删除任务会周期性检查当前日志大小是否超过设定的阈值(log.retention.bytes,默认是-1,表示无穷大),就从第一个日志分段中寻找可删除的日志段文件集合。如果超过阈值,

基于日志起始偏移量

该策略判断依据是日志段的下一个日志段的起始偏移量 baseOffset是否小于等于 logStartOffset,如果是,则可以删除此日志分段。这里说一下logStartOffset,一般情况下,日志文件的起始偏移量 logStartOffset等于第一个日志分段的 baseOffset,但这并不是绝对的,logStartOffset的值可以通过 DeleteRecordsRequest请求、使用 kafka-delete-records.sh 脚本、日志的清理和截断等操作进行修改。

日志压缩

日志压缩对于有相同key的不同value值,只保留最后一个版本。如果应用只关心 key对应的最新 value值,则可以开启 Kafka相应的日志清理功能,Kafka会定期将相同 key的消息进行合并,只保留最新的 value值。

  • log.cleanup.policy = compact 所有数据启用压缩策略

这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

Kafka高效读数据

kafka之所以可以快速读写的原因如下:

  1. kafka是分布式集群,采用分区方式,并行操作
  2. 读取数据采用稀疏索引,可以快速定位消费数据
  3. 顺序写磁盘
  4. 页缓冲和零拷贝

顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

页缓存与零拷贝

kafka高效读写的原因很大一部分取决于页缓存零拷贝

页缓存

在 Kafka 中,大量使用了 PageCache, 这也是 Kafka 能实现高吞吐的重要因素之一。

首先看一下读操作,当一个进程要去读取磁盘上的文件内容时,操作系统会先查看要读取的数据页是否缓冲在PageCache 中,如果存在则直接返回要读取的数据,这就减少了对于磁盘 I/O的 操作;但是如果没有查到,操作系统会向磁盘发起读取请求并将读取的数据页存入 PageCache 中,之后再将数据返回给进程,就和使用redis缓冲是一个道理。

接着写操作和读操作是一样的,如果一个进程需要将数据写入磁盘,操作系统会检查数据页是否在PageCache 中已经存在,如果不存在就在 PageCache中添加相应的数据页,接着将数据写入对应的数据页。另外被修改过后的数据页也就变成了脏页,操作系统会在适当时间将脏页中的数据写入磁盘,以保持数据的一致性。

具体的刷盘机制可以通过 log.flush.interval messageslog.flush .interval .ms 等参数来控制。同步刷盘可以提高消息的可靠性,防止由于机器 掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。一般并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这 种严重影响性能的行为来保障 。

零拷贝

零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数,通常使用在IO读写过程中。常规应用程序IO过程如下图,会经过四次拷贝:

  1. 数据从磁盘经过DMA(直接存储器访问)到内核的Read Buffer;
  2. 内核态的Read Buffer到用户态应用层的Buffer
  3. 用户态的Buffer到内核态的Socket Buffer
  4. Socket Buffer到网卡的NIC Buffer

从上面的流程可以知道内核态和用户态之间的拷贝相当于执行两次无用的操作,之间切换也会花费很多资源;当数据从磁盘经过DMA 拷贝到内核缓存(页缓存)后,为了减少CPU拷贝的性能损耗,操作系统会将该内核缓存与用户层进行共享,减少一次CPU copy过程,同时用户层的读写也会直接访问该共享存储,本身由用户层到Socket缓存的数据拷贝过程也变成了从内核到内核的CPU拷贝过程,更加的快速,这就是零拷贝,IO流程如下图。

甚至如果我们的消息存在页缓存PageCache中,还避免了硬盘到内核的拷贝过程,更加一步提升了消息的吞吐量。 (大概就理解成传输的数据只保存在内核空间,不需要再拷贝到用户态的应用层)


Java的JDK NIO中方法transferTo()方法就能够实现零拷贝操作,这个实现依赖于操作系统底层的sendFile()实现的

Kafka消费者

消费模式

常见的消费模式有两种:

poll(拉):消费者主动向服务端拉取消息。

push(推):服务端主动推送消息给消费者。

由于推模式很难考虑到每个客户端不同的消费速率,导致消费者无法消费消息而宕机,因此kafka采用的是poll的模式,该模式有个缺点,如果服务端没有消息,消费端就会一直空轮询。为了避免过多不必要的空轮询,kafka做了改进,如果没消息服务端就会暂时保持该请求,在一段时间内有消息再回应给客户端。

消费工作流程

消费者总体工作流程

消费者对消息进行消费,并且将已经消费的消息加入 _consumer_offsets 中。

消费者组原理

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

对于消息中间件而言,一般有两种消息投递模式:点对点(P2P, Point-to-Point)模式和发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic) , 主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题, 而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行 接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。Kafka同时支待两种消息投递模式,而这正是得益于消费者与消费组模型的契合:

  • 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
  • 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

消费者组选举Leader

具体的消费者组初始化流程:

通过对GroupId进行Hash得到那台服务器的coordinator ,coordinator负责选出消费组中的Leader ,并且协调信息。真正存储消费记录的是 _consumer_offsets_partition 。

消费者API

消费组单消费者以及消费者组多消费者

注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CustomConsumer {
public static void main(String[] args) {
//0.配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.106.86.64:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

//1.创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
ArrayList<String> topic = new ArrayList<>();
topic.add("first");
kafkaConsumer.subscribe(topic);


//2.消费信息
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
System.out.println(record);
});
}
//3.关闭
}
}

分区平衡以及再平衡

参数名称 描述
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。

session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。

partition.assignment.strategy 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky (协作者粘性)

分区分配策略

我们知道一个 Consumer Group 中有多个 Consumer,一个 Topic 也有多个 Partition,所以必然会涉及到 Partition 的分配问题: 确定哪个 Partition 由哪个 Consumer 来消费的问题。

Kafka 客户端提供了3 种分区分配策略:RangeAssignorRoundRobinAssignorStickyAssignor,前两种 分配方案相对简单一些StickyAssignor分配方案相对复杂一些。

Range

Range 分区分配再平衡案例
(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 3、4 号分区数据。
2 号消费者:消费到 5、6 号分区数据。
0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。 (被整体分配)
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、1、2、3 号分区数据。
2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

RoundRobin

1
2
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 2、5 号分区数据
2 号消费者:消费到 4、1 号分区数据
0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,分别由 1 号消费者或者 2 号消费者消费。(采用轮训)
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、2、4、6 号分区数据
2 号消费者:消费到 1、3、5 号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

Sticky:

StickyAssignor 分区分配算法是 Kafka 客户端提供的分配策略中最复杂的一种,可以通过 partition.assignment.strategy 参数去设置,从 0.11 版本开始引入,目的就是在执行新分配时,尽量在上一次分配结果上少做调整,其主要实现了以下2个目标:

1)、Topic Partition 的分配要尽量均衡。

2)、当 Rebalance(重分配,后面会详细分析) 发生时,尽量与上一次分配结果保持一致。

该算法的精髓在于,重分配后,还能尽量与上一次结果保持一致,进而达到消费者故障下线,故障恢复后的均衡问题,在此就不举例了。

1
2
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");

offset位移提交

offset 的默认维护位置

Kafka 0.9 版本之前consumer默认将offset保存在Zookeeper中,从0.9版本之后consumer默认保存在Kafka一个内置的topic中,该topic为_consumer_offsets。

消费者提交的offset值维护在**__consumer_offsets这个Topic中,具体维护在哪个分区中,是由消费者所在的消费者组groupid**决定,计算方式是:groupid的hashCode值对50取余。当kafka环境正常而消费者不能消费时,有可能是对应的__consumer_offsets分区leader为none或-1,或者分区中的日志文件损坏导致。

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据。

一般情况下, 当集群中第一次有消费者消费消息时会自动创建主题_ consumer_ offsets, 不过它的副本因子还受offsets.topic .replication.factor参数的约束,这个参数的默认值为3 (下载安装的包中此值可能为1),分区数可以通过offsets.topic.num.partitions参数设置,默认为50。

在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

1
2
3
4
5
6
7
8
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 47.106.86.64:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
1
[offset,atguigu,1]::OffsetAndMetadata(offset=7,
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203,
expireTimestamp=None)
[offset,atguigu,0]::OffsetAndMetadata(offset=8,
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203,
expireTimestamp=None)

消费者提交offset的方式有两种,自动提交手动提交

自动提交

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。

  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

自动提交有可能出现消息消费失败,但是却提交了offset的情况,导致消息丢失。为了能够实现消息消费offset的精确控制,更推荐手动提交。

1
2
3
4
// 自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);

手动提交

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因 此Kafka还提供了手动提交offset的API。手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。 阻塞线程,一直到提交到成功,会进行失败重试
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。没有失败重试机制,会提交失败

指定消费位置

在kafka中当消费者查找不到所记录的消费位移时,会根据auto.offset.reset的配置,决定从何处消费。

auto.offset.reset = earliest | latest | none 默认是 latest。

  • earliest:不管你的offset,始终从最早的位置开始拉取。
  • latest(默认值):自动将偏移量重置为最新偏移量,从最后提交的offset的位置拉取。
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

Kafka中的消费位移是存储在一个内部主题中的, 而我们可以使用**seek()**方法可以突破这一限制:消费位移可以保存在任意的存储介质中, 例如数据库、 文件系统等。以数据库为例, 我们将消费位移保存在其中的一个表中, 在下次消费的时候可以读取存储在数据表中的消费位移并通过seek()方法指向这个具体的位置 。

1
2
3
//配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

指定位移消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 指定位置进行消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();

// 保证分区分配方案已经制定完毕
while (assignment.size() == 0){
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}

// 指定消费的offset
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition,600);
}

// 3 消费数据
while (true){

ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}

指定时间消费

原理就是查到时间对应的offset再去指定位移消费,为了确保同步到分区信息,我们还需要确保能获取到分区,再去查询分区时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 指定位置进行消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();

// 保证分区分配方案已经制定完毕
while (assignment.size() == 0){
kafkaConsumer.poll(Duration.ofSeconds(1));

assignment = kafkaConsumer.assignment();
}

// 希望把时间转换为对应的offset
HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();

// 封装对应集合
for (TopicPartition topicPartition : assignment) {
topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}

Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);

// 指定消费的offset
for (TopicPartition topicPartition : assignment) {

OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);

kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
}

// 3 消费数据
while (true){

ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

System.out.println(consumerRecord);
}
}

漏消费和重复消费

重复消费:已经消费了数据,但是 offset 没提交。
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

消费者事务

数据积压(提高吞吐量)

参数名称 描述
fetch.max.bytes 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)ormax.message.bytes (topic config)影响。

max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条

拦截器

与生产者对应,消费者也有拦截器。我们来看看拦截器具体的方法。

1
2
3
4
5
6
7
8
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {

ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

void close();
}

Kafka Consumer会在poll()**方法返回之前调用拦截器的**onConsume()**方法来对消息进行相应的定制化操作,比如**修改返回的消息内容按照某种规则过滤消息(可能会减少poll()方法返回 的消息的个数)。如果onConsume()方法中抛出异常, 那么会被捕获并记录到日志中, 但是异常不会再向上传递。

Kafka Consumer会在提交完消费位移之后调用拦截器的**onCommit()**方法, 可以使用这个方法来记录跟踪所提交的位移信息,比如当消费者使用commitSync的无参方法时,我们不知道提交的消费位移的具体细节, 而使用拦截器的onCommit()方法却可以做到这 一点。

Kafka整合Spring Boot

  1. 导包 -除了Spring Boot 之外还需要额外导入 Spring Web、Kafka

2, 编写配置文件

1
2
3
4
5
6
7
8
9
10
11
12
spring:
kafka:
bootstrap-servers: 47.106.86.64:9092
consumer:
group-id: test2
# #序列化器 以及反序列化器
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# #序列化器 以及反序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

3、定义简单生产者

1
2
3
4
5
6
7
8
9
10
11
@RestController
public class ProducerController {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;

@RequestMapping("/hi")
public String data(String msg){
kafkaTemplate.send("first",msg);
return "ok";
}
}

编写具有回调函数的生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("first", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}

@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("first", callbackMessage).addCallback(
(ListenableFutureCallback<? super SendResult<String, String>>) new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}

@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}

4、定义消费者 ->启动监听线程

1
2
3
4
5
6
7
8
@Configuration
public class KafkaConfiguration{
@KafkaListener(topics = {"first"})
public void message1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("点对点消费1:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
1
2
3
4
5
@RequestMapping("/hi")
public String data(String msg){
kafkaTemplate.send("first",msg);
return "ok";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
编写具有回调函数的生产者

```java
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("first", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}

@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("first", callbackMessage).addCallback(
(ListenableFutureCallback<? super SendResult<String, String>>) new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}

@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}

4、定义消费者 ->启动监听线程

1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class KafkaConfiguration{


@KafkaListener(topics = {"first"})
public void message1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("点对点消费1:"+record.topic()+"-"+record.partition()+"-"+record.value());
}

}

参考 :https://blog.csdn.net/prague6695/article/details/123869202

Kafka Consumer配置

在0.9.0.0中,我们引入了新的Java消费者来替代早期基于Scala的简单和高级消费者。新老客户端的配置如下。

新消费者配置

新消费者配置:(注意,右面是可拖动的)

NAMEDESCRIPTIONTYPEDEFAULTVALID VALUESIMPORTANCE
bootstrap.servershost/port,用于和kafka集群建立初始化连接。因为这些服务器地址仅用于初始化连接,并通过现有配置的来发现全部的kafka集群成员(集群随时会变化),所以此列表不需要包含完整的集群地址(但尽量多配置几个,以防止配置的服务器宕机)。listhigh
key.deserializerkey的解析序列化接口实现类(Deserializer)。classhigh
value.deserializervalue的解析序列化接口实现类(Deserializer)classhigh
fetch.min.bytes服务器哦拉取请求返回的最小数据量,如果数据不足,请求将等待数据积累。默认设置为1字节,表示只要单个字节的数据可用或者读取等待请求超时,就会应答读取请求。将此值设置的越大将导致服务器等待数据累积的越长,这可能以一些额外延迟为代价提高服务器吞吐量。int1[0,...]high
group.id此消费者所属消费者组的唯一标识。如果消费者用于订阅或offset管理策略的组管理功能,则此属性是必须的。string""high
heartbeat.interval.ms当使用Kafka的分组管理功能时,心跳到消费者协调器之间的预计时间。心跳用于确保消费者的会话保持活动状态,并当有新消费者加入或离开组时方便重新平衡。该值必须必比session.timeout.ms小,通常不高于1/3。它可以调整的更低,以控制正常重新平衡的预期时间。int3000(3秒)high
max.partition.fetch.bytes服务器将返回每个分区的最大数据量。如果拉取的第一个非空分区中第一个消息大于此限制,则仍然会返回消息,以确保消费者可以正常的工作。broker接受的最大消息大小通过message.max.bytes(broker config)或max.message.bytes (topic config)定义。参阅fetch.max.bytes以限制消费者请求大小。int1048576[0,...]high
session.timeout.ms用于发现消费者故障的超时时间。消费者周期性的发送心跳到broker,表示其还活着。如果会话超时期满之前没有收到心跳,那么broker将从分组中移除消费者,并启动重新平衡。请注意,该值必须在broker配置的group.min.session.timeout.msgroup.max.session.timeout.ms允许的范围内。int45000(45s)high
ssl.key.password密钥存储文件中的私钥的密码。 客户端可选passwordnullhigh
ssl.keystore.location密钥存储文件的位置, 这对于客户端是可选的,并且可以用于客户端的双向认证。stringnullhigh
ssl.keystore.password密钥仓库文件的仓库密码。客户端可选,只有ssl.keystore.location配置了才需要。passwordnullhigh
ssl.truststore.location信任仓库文件的位置stringnullhigh
ssl.truststore.password信任仓库文件的密码passwordnullhigh
auto.offset.reset当Kafka中没有初始offset或如果当前的offset不存在时(例如,该数据被删除了),该怎么办。 *earliest:自动将偏移重置为最早的偏移 * latest:自动将偏移重置为最新偏移 *none:如果消费者组找到之前的offset,则向消费者抛出异常 * 其他:抛出异常给消费者。stringlatest[latest, earliest, none]medium
connections.max.idle.ms指定在多少毫秒之后关闭闲置的连接long540000medium
enable.auto.commit如果为true,消费者的offset将在后台周期性的提交booleantruemedium
exclude.internal.topics内部topic的记录(如偏移量)是否应向消费者公开。如果设置为true,则从内部topic接受记录的唯一方法是订阅它。booleantruemedium
fetch.max.bytes服务器为拉取请求返回的最大数据值。这不是绝对的最大值,如果在第一次非空分区拉取的第一条消息大于该值,该消息将仍然返回,以确保消费者继续工作。接收的最大消息大小通过message.max.bytes (broker config) 或 max.message.bytes (topic config)定义。注意,消费者是并行执行多个提取的。int52428800[0,...]medium
max.poll.interval.ms使用消费者组管理时poll()调用之间的最大延迟。消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。int300000[1,...]medium
max.poll.records在单次调用poll()中返回的最大消息数。int500[1,...]medium
partition.assignment.strategy当使用组管理时,客户端将使用分区分配策略的类名来分配消费者实例之间的分区所有权listclass org.apache.kafka .clients.consumer .RangeAssignormedium
receive.buffer.bytes读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。int65536[-1,...]medium
request.timeout.ms配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽则客户端将重新发送请求。int305000[0,...]medium
sasl.jaas.configJAAS配置文件中SASL连接登录上下文参数。 这里描述JAAS配置文件格式。 该值的格式为: '(=)*;'passwordnullmedium
sasl.kerberos.service.nameKafka运行Kerberos principal名。可以在Kafka的JAAS配置文件或在Kafka的配置文件中定义。stringnullmedium
sasl.mechanism用于客户端连接的SASL机制。安全提供者可用的机制。GSSAPI是默认机制。stringGSSAPImedium
security.protocol用于与broker通讯的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。stringPLAINTEXTmedium
send.buffer.bytes发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。int131072[-1,...]medium
ssl.enabled.protocols启用SSL连接的协议列表。listTLSv1.2,TLSv1.1,TLSv1medium
ssl.keystore.typekey仓库文件的文件格式,客户端可选。stringJKSmedium
ssl.protocol用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中的允许值为TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。stringTLSmedium
ssl.provider用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。stringnullmedium
ssl.truststore.type信任存储文件的文件格式。stringJKSmedium
auto.commit.interval.ms如果enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)。int5000[0,...]low
check.crcs自动检查CRC32记录的消耗。 这样可以确保消息发生时不会在线或磁盘损坏。 此检查增加了一些开销,因此在寻求极致性能的情况下可能会被禁用。booleantruelow
client.id在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志记录中,来跟踪ip/port的请求源。string""low
fetch.max.wait.ms如果没有足够的数据满足fetch.min.bytes,服务器将在接收到提取请求之前阻止的最大时间。int500[0,...]low
interceptor.classes用作拦截器的类的列表。 你可实现ConsumerInterceptor接口以允许拦截(也可能变化)消费者接收的消息。 默认情况下,没有拦截器。listnulllow
metadata.max.age.ms在一定时间段之后(以毫秒为单位的),强制更新元数据,即使没有任何分区领导变化,任何新的broker或分区。long300000[0,...]low
metric.reporters用作度量记录员类的列表。实现MetricReporter接口以允许插入通知新的度量创建的类。JmxReporter始终包含在注册JMX统计信息中。list""low
metrics.num.samples保持的样本数以计算度量。int2[1,...]low
metrics.recording.level最高的记录级别。stringINFO[INFO, DEBUG]low
metrics.sample.window.msThe window of time a metrics sample is computed over.long30000[0,...]low
reconnect.backoff.ms尝试重新连接指定主机之前等待的时间,避免频繁的连接主机,这种机制适用于消费者向broker发送的所有请求。long50[0,...]low
retry.backoff.ms尝试重新发送失败的请求到指定topic分区之前的等待时间。避免在某些故障情况下,频繁的重复发送。long100[0,...]low
sasl.kerberos.kinit.cmd Kerberoskinit命令路径。string/usr/bin/kinitlow
sasl.kerberos.min.time.before.relogin尝试/恢复之间的登录线程的休眠时间。long60000low
sasl.kerberos.ticket.renew.jitter添加到更新时间的随机抖动百分比。double0.05low
sasl.kerberos.ticket.renew.window.factor登录线程将休眠,直到从上次刷新到ticket的指定的时间窗口因子到期,此时将尝试续订ticket。double0.8low
ssl.cipher.suites密码套件列表,用于TLS或SSL网络协议的安全设置,认证,加密,MAC和密钥交换算法的明明组合。默认情况下,支持所有可用的密码套件。listnulllow
ssl.endpoint.identification.algorithm使用服务器证书验证服务器主机名的端点识别算法。stringnulllow
ssl.keymanager.algorithm密钥管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的密钥管理器工厂算法。stringSunX509low
ssl.secure.random.implementation用于SSL加密操作的SecureRandom PRNG实现。stringnulllow
ssl.trustmanager.algorithm信任管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。stringPKIXlow

kafka >= 2.0.0

名称描述类型默认有效值重要程度
sasl.client.callback.handler.class实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。classnull中间
sasl.login.callback.handler.class实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandlerclassnull中间
sasl.login.class实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLoginclassnull中间

kafka >= 2.1.0

名称描述类型默认有效值重要程度
client.dns.lookup控制客户端如何使用DNS查询。如果设置为 use_all_dns_ips,则依次连接到每个返回的IP地址,直到成功建立连接。断开连接后,使用下一个IP。一旦所有的IP都被使用过一次,客户端就会再次从主机名中解析IP(s)(然而,JVM和操作系统都会缓存DNS名称查询)。如果设置为 resolve_canonical_bootstrap_servers_only,则将每个引导地址解析成一个canonical名称列表。在bootstrap阶段之后,这和use_all_dns_ips的行为是一样的。如果设置为 default(已弃用),则尝试连接到查找返回的第一个IP地址,即使查找返回多个IP地址。stringuse_all_dns_ips[default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only]中间

kafka >= 2.7

名称描述类型默认有效值重要程度
ssl.truststore.certificates可信证书的格式由'ssl.truststore.type'指定。默认的SSL引擎工厂只支持带X.509证书的PEM格式。passwordnull
socket.connection.setup.timeout.max.ms客户端等待建立socket连接的最大时间。连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。long127000 (127 seconds)中间
socket.connection.setup.timeout.ms客户端等待建立socket连接的时间。如果在超时之前没有建立连接,客户端将关闭socket通道。long10000 (10 seconds)中间

旧消费者配置

旧消费者配置如下:

  • group.id
  • zookeeper.connect
PROPERTYDEFAULTDESCRIPTION
group.id标识消费者所属消费者组(独一的)。通过设置相同的组ID,多个消费者表明属于该消费者组的一部分。
zookeeper.connect指定ZooKeeper连接字符串,格式为hostname:port,其中host和port是ZooKeeper服务器的主机和端口。 为了使ZooKeeper宕机时连接到其他ZooKeeper节点,你还可以以hostname1:host1,hostname2:port2,hostname3:port3的形式指定多个主机。 还可以设置ZooKeeper chroot路径,作为其ZooKeeper连接字符串的一部分,将其数据放置在全局ZooKeeper命名空间中的某个路径下。 如果是这样,消费者应该在其连接字符串中使用相同的chroot路径。 例如,要给出/chroot/path的chroot路径,你需要将该值设置为:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
consumer.idnull如果未设置将自动生成。
socket.timeout.ms30 * 1000网络请求socker的超时时间。实际的超时是 max.fetch.wait+socket.timeout.ms的时间。
socket.receive.buffer.bytes64 * 1024网络请求socker的接收缓存大小
fetch.message.max.bytes1024 * 1024每个拉取请求的每个topic分区尝试获取的消息的字节大小。这些字节将被读入每个分区的内存,因此这有助于控制消费者使用的内存。 拉取请求的大小至少与服务器允许的最大消息的大小一样大,否则生产者可能发送大于消费者可以拉取的消息。
num.consumer.fetchers1用于拉取数据的拉取线程数。
auto.commit.enabletrue如果为true,请定期向ZooKeeper提交消费者已经获取的消息的偏移量。 当进程失败时,将使用这种承诺偏移量作为新消费者开始的位置。
auto.commit.interval.ms60 * 1000消费者offset提交到zookeeper的频率(以毫秒为单位)
queued.max.message.chunks2消费缓存消息块的最大大小。每个块可以达到fetch.message.max.bytes。
rebalance.max.retries4当新的消费者加入消费者组时,消费者集合尝试“重新平衡”负载,并为每个消费者分配分区。如果消费者集合在分配时发生时发生变化,则重新平衡将失败并重试。此设置控制尝试之前的最大尝试次数。
fetch.min.bytes1拉取请求返回最小的数据量。如果没有足够的数据,请求将等待数据积累,然后应答请求。
fetch.wait.max.ms100如果没有足够的数据(fetch.min.bytes),服务器将在返回请求数据之前阻塞的最长时间。
rebalance.backoff.ms2000重新平衡时重试之间的回退时间。如果未设置,则使用zookeeper.sync.time.ms中的值。
refresh.leader.backoff.ms200回退时间等待,然后再尝试选举一个刚刚失去leader的分区。
auto.offset.resetlargest如果ZooKeeper中没有初始偏移量,或偏移值超出范围,该怎么办? *最小:自动将偏移重置为最小偏移 * 最大:自动将偏移重置为最大偏移 * 其他任何事情:抛出异常消费者
consumer.timeout.ms-1如果在指定的时间间隔后没有消息可用,则向用户发出超时异常
exclude.internal.topicstrue来自内部topic的消息(如偏移量)是否应该暴露给消费者。
client.idgroup id value客户端ID是每个请求中发送的用户指定的字符串,用于帮助跟踪调用。 它应该逻辑地标识发出请求的应用程序。
zookeeper.session.timeout.ms6000ZooKeeper会话超时。如果消费者在这段时间内没有对ZooKeeper心跳,那么它被认为是死亡的,并且会发生重新平衡。
zookeeper.connection.timeout.ms6000与zookeeper建立连接时客户端等待的最长时间。
zookeeper.sync.time.ms2000ZK follower可以罗ZK leader多久
offsets.storagezookeeper选择存储偏移量的位置(zookeeper或kafka)。
offsets.channel.backoff.ms1000重新连接offset通道或重试失败的偏移提取/提交请求时的回退周期。
offsets.channel.socket.timeout.ms10000读取offset拉取/提交响应的Socker的超时时间。此超时也用于查询offset manager的ConsumerMetadata请求。
offsets.commit.max.retries5失败时重试偏移提交的最大次数。此重试计数仅适用于停机期间的offset提交,它不适用于自动提交线程的提交。它也不适用于在提交offset之前查询偏移协调器的尝试。即如果消费者元数据请求由于任何原因而失败,则将重试它,并且重试不计入该限制。
dual.commit.enabledtrue如果使用“kafka”作为offsets.storage,则可以向ZooKeeper(除Kafka之外)进行双重提交offset。在从基于zookeeper的offset存储迁移到kafka存储的时候可以这么做。对于任何给定的消费者组,在该组中的所有实例已迁移到提交到broker(而不是直接到ZooKeeper)的新的版本之后,可以关闭这个。
partition.assignment.strategyrange在“range”或“roundrobin”策略之间选择将分区分配给消费者流。 循环分区分配器分配所有可用的分区和所有可用的消费者线程。然后,继续从分区到消费者线程进行循环任务。如果所有消费者实例的订阅是相同的,则分区将被均匀分布。(即,分区所有权计数将在所有消费者线程之间的差异仅在一个delta之内。)循环分配仅在以下情况下被允许:(a)每个主题在消费者实例中具有相同数量的流(b)订阅的topic的对于组内的每个消费者实例都是相同的。 范围(Range)分区基于每个topic。对于每个主题,我们按数字顺序排列可用的分区,并以字典顺序排列消费者线程。然后,我们将分区数除以消费者流(线程)的总数来确定分配给每个消费者的分区数。如果不均匀分割,那么前几个消费者将会有多的分区。

有关消费者配置的更多详细信息,请参见scaf类kafka.consumer.ConsumerConfig。

Kafka Producer配置

生产者配置

java生产者配置:

NAMEDESCRIPTIONTYPEDEFAULTVALID VALUESIMPORTANCE
bootstrap.servershost/port列表,用于初始化建立和Kafka集群的连接。列表格式为host1:port1,host2:port2,....,无需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(你可以多提供几个,以防提供的服务器关闭)listhigh
key.serializer实现 org.apache.kafka.common.serialization.Serializer 接口的 key 的 Serializer 类。classhigh
value.serializer实现 org.apache.kafka.common.serialization.Serializer 接口的value 的 Serializer 类。classhigh
acks生产者需要leader确认请求完成之前接收的应答数。此配置控制了发送消息的耐用性,支持以下配置: acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。 acks=1,这意味着leader写入消息到本地日志就立即响应,而不等待所有follower应答。在这种情况下,如果响应消息之后但follower还未复制之前leader立即故障,那么消息将会丢失。 acks=all 这意味着leader将等待所有副本同步后应答消息。此配置保障消息不会丢失(只要至少有一个同步的副本或者)。这是最强壮的可用性保障。等价于acks=-1。string1[all, -1, 0, 1]high
buffer.memory生产者用来缓存等待发送到服务器的消息的内存总字节数。如果消息发送比可传递到服务器的快,生产者将阻塞max.block.ms之后,抛出异常。 此设置应该大致的对应生产者将要使用的总内存,但不是硬约束,因为生产者所使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启动压缩),以及用于保持发送中的请求。long33554432[0,...]high
compression.type数据压缩的类型。默认为空(就是不压缩)。有效的值有 none,gzip,snappy, 或 lz4。压缩全部的数据批,因此批的效果也将影响压缩的比率(更多的批次意味着更好的压缩)。stringnonehigh
retries设置一个比零大的值,客户端如果发送失败则会重新发送。注意,这个重试功能和客户端在接到错误之后重新发送没什么不同。如果max.in.flight.requests.per.connection没有设置为1,有可能改变消息发送的顺序,因为如果2个批次发送到一个分区中,并第一个失败了并重试,但是第二个成功了,那么第二个批次将超过第一个。int0[0,...,2147483647]high
ssl.key.password密钥仓库文件中的私钥的密码。passwordnullhigh
ssl.keystore.location密钥仓库文件的位置。可用于客户端的双向认证。stringnullhigh
ssl.keystore.password密钥仓库文件的仓库密码。只有配置了ssl.keystore.location时才需要。passwordnullhigh
ssl.truststore.location信任仓库的位置stringnullhigh
ssl.truststore.password信任仓库文件的密码passwordnullhigh
batch.size当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位): 不会打包大于此配置大小的消息。 发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。 较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。int16384[0,...]medium
client.id当发出请求时传递给服务器的id字符串。这样做的目的是允许服务器请求记录记录这个【逻辑应用名】,这样能够追踪请求的源,而不仅仅只是ip/prot。string""medium
connections.max.idle.ms多少毫秒之后关闭闲置的连接。long540000medium
linger.ms生产者组将发送的消息组合成单个批量请求。正常情况下,只有消息到达的速度比发送速度快的情况下才会出现。但是,在某些情况下,即使在适度的负载下,客户端也可能希望减少请求数量。此设置通过添加少量人为延迟来实现。- 也就是说,不是立即发出一个消息,生产者将等待一个给定的延迟,以便和其他的消息可以组合成一个批次。这类似于Nagle在TCP中的算法。此设置给出批量延迟的上限:一旦我们达到分区的batch.size值的记录,将立即发送,不管这个设置如何,但是,如果比这个小,我们将在指定的“linger”时间内等待更多的消息加入。此设置默认为0(即无延迟)。假设,设置 linger.ms=5,将达到减少发送的请求数量的效果,但对于在没有负载情况,将增加5ms的延迟。long0[0,...]medium
max.block.ms该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。long60000[0,...]medium
max.request.size请求的最大大小(以字节为单位)。此设置将限制生产者的单个请求中发送的消息批次数,以避免发送过大的请求。这也是最大消息批量大小的上限。请注意,服务器拥有自己的批量大小,可能与此不同。int1048576[0,...]medium
partitioner.class实现Partitioner接口的的Partitioner类。classorg.apache.kafka.clients.producer.internals.DefaultPartitionermedium
receive.buffer.bytes读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果值为-1,则将使用OS默认值。int32768[-1,...]medium
request.timeout.ms该配置控制客户端等待请求响应的最长时间。如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。int30000[0,...]medium
sasl.jaas.configJAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。这里描述JAAS配置文件格式。该值的格式为:'(=)*;'passwordnullmedium
sasl.kerberos.service.nameKafka运行的Kerberos主体名称。可以在Kafka的JAAS配置或Kafka的配置中定义。stringnullmedium
sasl.mechanismSASL机制用于客户端连接。这是安全提供者可用与任何机制。GSSAPI是默认机制。stringGSSAPImedium
security.protocol用于与broker通讯的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。stringPLAINTEXTmedium
send.buffer.bytes发送数据时,用于TCP发送缓存(SO_SNDBUF)的大小。如果值为 -1,将默认使用系统的。int131072[-1,...]medium
ssl.enabled.protocols启用SSL连接的协议列表。listTLSv1.2,TLSv1.1,TLSv1medium
ssl.keystore.type密钥存储文件的文件格式。对于客户端是可选的。stringJKSmedium
ssl.protocol最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。stringTLSmedium
ssl.provider用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供程序。stringnullmedium
ssl.truststore.type信任仓库文件的文件格式。stringJKSmedium
enable.idempotence当设置为‘true’,生产者将确保每个消息正好一次复制写入到stream。如果‘false’,由于broker故障,生产者重试。即,可以在流中写入重试的消息。此设置默认是‘false’。请注意,启用幂等式需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零。另外acks必须设置为“全部”。如果这些值保持默认值,我们将覆盖默认值。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。booleanfalselow
interceptor.classes实现ProducerInterceptor接口,你可以在生产者发布到Kafka群集之前拦截(也可变更)生产者收到的消息。默认情况下没有拦截器。listnulllow
max.in.flight.requests.per.connection阻塞之前,客户端单个连接上发送的未应答请求的最大数量。注意,如果此设置设置大于1且发送失败,则会由于重试(如果启用了重试)会导致消息重新排序的风险。int5[1,...]low
metadata.max.age.ms在一段时间段之后(以毫秒为单位),强制更新元数据,即使我们没有看到任何分区leader的变化,也会主动去发现新的broker或分区。long300000[0,...]low
metric.reporters用作metrics reporters(指标记录员)的类的列表。实现MetricReporter接口,将受到新增加的度量标准创建类插入的通知。 JmxReporter始终包含在注册JMX统计信息中。list""low
metrics.num.samples维护用于计算度量的样例数量。int2[1,...]low
metrics.recording.level指标的最高记录级别。stringINFO[INFO, DEBUG]low
metrics.sample.window.ms度量样例计算上long30000[0,...]low
reconnect.backoff.max.ms重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。long1000[0,...]low
reconnect.backoff.ms尝试重新连接到给定主机之前等待的基本时间量。这避免了在循环中高频率的重复连接到主机。这种回退适应于客户端对broker的所有连接尝试。long50[0,...]low
retry.backoff.ms尝试重试指定topic分区的失败请求之前等待的时间。这样可以避免在某些故障情况下高频次的重复发送请求。long100[0,...]low
sasl.kerberos.kinit.cmdKerberos kinit 命令路径。string/usr/bin/kinitlow
sasl.kerberos.min.time.before.reloginLogin线程刷新尝试之间的休眠时间。long60000low
sasl.kerberos.ticket.renew.jitter添加更新时间的随机抖动百分比。double0.05low
sasl.kerberos.ticket.renew.window.factor登录线程将睡眠,直到从上次刷新ticket到期时间的指定窗口因子为止,此时将尝试续订ticket。double0.8low
ssl.cipher.suites密码套件列表。这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。默认情况下,支持所有可用的密码套件。listnulllow
ssl.endpoint.identification.algorithm使用服务器证书验证服务器主机名的端点识别算法。stringnulllow
ssl.keymanager.algorithm用于SSL连接的密钥管理因子算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。stringSunX509low
ssl.secure.random.implementation用于SSL加密操作的SecureRandom PRNG实现。stringnulllow
ssl.trustmanager.algorithm用于SSL连接的信任管理因子算法。默认值是JAVA虚拟机配置的信任管理工厂算法。stringPKIXlow
transaction.timeout.ms生产者在主动中止正在进行的交易之前,交易协调器等待事务状态更新的最大时间(以ms为单位)。如果此值大于broker中的max.transaction.timeout.ms设置,则请求将失败,并报“InvalidTransactionTimeout”错误。int60000low
transactional.id用于事务传递的TransactionalId。这样可以跨多个生产者会话的可靠性语义,因为它允许客户端保证在开始任何新事务之前使用相同的TransactionalId的事务已经完成。如果没有提供TransactionalId,则生产者被限制为幂等传递。请注意,如果配置了TransactionalId,则必须启用enable.idempotence。 默认值为空,这意味着无法使用事务。stringnullnon-empty stringlow

kafka >= 2.0.0

名称描述类型默认有效值重要程度
sasl.client.callback.handler.class实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。classnull中间
sasl.login.callback.handler.class实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandlerclassnull中间
sasl.login.class实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLoginclassnull中间

kafka >= 2.1.0

名称描述类型默认有效值重要程度
client.dns.lookup控制客户端如何使用DNS查询。如果设置为 use_all_dns_ips,则依次连接到每个返回的IP地址,直到成功建立连接。断开连接后,使用下一个IP。一旦所有的IP都被使用过一次,客户端就会再次从主机名中解析IP(s)(然而,JVM和操作系统都会缓存DNS名称查询)。如果设置为 resolve_canonical_bootstrap_servers_only,则将每个引导地址解析成一个canonical名称列表。在bootstrap阶段之后,这和use_all_dns_ips的行为是一样的。如果设置为 default(已弃用),则尝试连接到查找返回的第一个IP地址,即使查找返回多个IP地址。stringuse_all_dns_ips[default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only]中间
delivery.timeout.ms调用send()返回后报告成功或失败的时间上限。这限制了消息在发送前被延迟的总时间,等待broker确认的时间(如果期望的话),以及允许重试发送失败的时间。如果遇到不可恢复的错误,重试次数已经用尽,或者消息被添加到一个达到较早发送到期期限的批次中,生产者可能会报告未能在这个配置之前发送记录。这个配置的值应该大于或等于request.timeout.mslinger.ms之和。int120000 (2 minutes)[0,...]中间

kafka >= 2.7

名称描述类型默认有效值重要程度
ssl.truststore.certificates可信证书的格式由'ssl.truststore.type'指定。默认的SSL引擎工厂只支持带X.509证书的PEM格式。passwordnull
socket.connection.setup.timeout.max.ms客户端等待建立socket连接的最大时间。连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。long127000 (127 seconds)中间
socket.connection.setup.timeout.ms客户端等待建立socket连接的时间。如果在超时之前没有建立连接,客户端将关闭socket通道。long10000 (10 seconds)中间