简介

  消息队列在平时开发时,只要涉及到高并发解耦异步处理等,最好的方式就是引入消息队列,Kafka 是一个分布式的基于发布/订阅模式的消息队列,其具有高吞吐,可恢复性等特点,很值得学习,以下是尚硅谷的Kafka的学习笔记。
  原视频地址为:https://www.bilibili.com/video/BV1a4411B7V9?from=search&seid=17590871349137942281

消息队列

传统消息队列的应用场景

在这里插入图片描述

使用消息队列的好处

1)解耦
  传统的直接调用接口,会造成平台紧耦合,导致修改与扩展不易,加入消息队列后,允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2)可恢复性
  系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
3)缓冲
  有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
4)灵活性 & 峰值处理能力
  在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5)异步通信
  很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

消息队列的两种模式
点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

  消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
在这里插入图片描述

发布/订阅模式(一对多,消费者消费数据之后不会清除消息)

  消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
在这里插入图片描述

Kafka 基础架构

在这里插入图片描述
  1)Producer :消息生产者,就是向 kafka broker 发消息的客户端;
  2)Consumer :消息消费者,向 kafka broker 取消息的客户端;
  3)Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;
  4)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个partition,每个 partition 是一个有序的队列;
  5)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
  6)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
  7)follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。
  8)Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
  9)Consumer Group (CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

部署

部署没什么,重点看下配置参数与含义

#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径,为新建目录
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数,不能大于broker数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

Kafka 命令行操作

命令行相对开发用的不多,一般是直接调API,但知道常用命令会更方便
1)查看当前服务器中的所有 topic

bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

2)创建 topic

bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first

选项说明:

  • –topic 定义 topic 名
  • –replication-factor 定义副本数
  • –partitions 定义分区数

3)删除 topic

# 需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first

4)发送消息

bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>test
>hello world

5)消费消息

bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

–from-beginning:会把主题中以往所有的数据都读取出来
6)查看某个 Topic 的详情

bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first

7)修改分区数

bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6

Kafka 工作流程及文件存储机制

在这里插入图片描述
  Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic的。topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的 offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。
在这里插入图片描述
  由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic 名称+分区序号。例如,first 这个 topic 有三个分区,则其对应的文件夹为 first
0,first-1,first-2。

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

  index 和 log 文件以当前 segment 的第一条消息的 offset 命名。下图为 index 文件和 log文件的结构示意图。
在这里插入图片描述
  “.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

Kafka 生产者

分区策略

1)分区的原因
方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;

2)分区的原则
需要将 producer 发送的数据封装成一个 ProducerRecord 对象
在这里插入图片描述
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。

数据可靠性保证

  为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
在这里插入图片描述
1)副本数据同步策略

方案优点缺点
半数以上完成同步,就发送 ack延迟低选举新的 leader 时,容忍 n 台节点的故障,需要 2n+1 个副本
全部完成同步,才发送ack选举新的 leader 时,容忍 n 台节点的故障,需要 n+1 个副本延迟高

Kafka 选择了第二种方案,原因如下:
  1.同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要n+1个副本,而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
  2.虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。
2)ISR
  采用第二种方案之后,设想以下情景:leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack。这个问题怎么解决呢?
  Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间 未 向 leader 同 步 数 据 , 则该follower将被踢出ISR , 该时间阈值由replica.lag.time.max.ms 参数设定。 Leader 发生故障之后,就会从 ISR 中选举新的 leader。
3)ack 应答机制
  对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。
  所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks 参数配置:
acks

  • 0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据
  • 1:producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据
    在这里插入图片描述
  • -1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复。
    在这里插入图片描述
    4)故障处理细节
    在这里插入图片描述
  • LEO:指的是每个副本最大的 offset;
  • HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。

(1)follower 故障
  follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
(2)leader 故障
  leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。
  注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

Exactly Once 语义

  将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义
  At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的,At Most Once可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。 在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
  0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义。即:At Least Once + 幂等性= Exactly Once
  要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。Kafka 的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
  但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。

Kafka 消费者

消费方式

  consumer 采用 pull(拉)模式从 broker 中读取数据。
  push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。 它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
   pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。 针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

分区分配策略

  一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。
  Kafka 有两种分配策略,一个是 RoundRobin,即轮询
在这里插入图片描述
  一个是 Range,也是默认的方式,即:分区数量除以消费者数量,如分区数量7除以消费者数量3 等于 2 (N),再分区数量7对消费数量3取余得到1 ( M ),kafka的range算法是前 M个消费能得到N+1个分区,剩余的消费者分配到N个分区
在这里插入图片描述

offset 的维护

  由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
在这里插入图片描述
  Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。

Kafka 高效读写数据

1)顺序写磁盘
  Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
2)零复制技术
  零拷贝是指,用户将文件从磁盘拷贝至Kernel space后,不再拷贝进User space层,而直接在kernel space内操作
在这里插入图片描述

Zookeeper 在 Kafka 中的作用

  Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所有 topic 的分区副本分配leader 选举等工作。
  Controller 的管理工作都是依赖于 Zookeeper 的。
  以下为 partition 的 leader 选举过程:
在这里插入图片描述

Kafka 事务

  Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

Producer 事务

  为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer获得的PID 和Transaction ID 绑定。这样当Producer 重启后就可以通过正在进行的TransactionID 获得原来的 PID。
  为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。TransactionCoordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

Consumer 事务

  上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

Producer API

Kafka消息发送流程

  Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量—RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。
在这里插入图片描述
  batch.size: 只有数据积累到 batch.size 之后,sender 才会发送数据。
  linger.ms: 如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。

异步发送 API

1)导入依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

2)编写代码
需要用到的类:
KafkaProducer: 需要创建一个生产者对象,用来发送数据
ProducerConfig: 获取所需的一系列配置参数
ProducerRecord: 每条数据都要封装成一个 ProducerRecord 对象
1.不带回调函数的 API

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
	public static void main(String[] args) throws ExecutionException, 
	InterruptedException {
	Properties props = new Properties();
	//kafka 集群,broker-list
	props.put("bootstrap.servers", "hadoop102:9092");
	props.put("acks", "all");
	//重试次数
	props.put("retries", 1);
	//批次大小
	props.put("batch.size", 16384);
	//等待时间
	props.put("linger.ms", 1);
	//RecordAccumulator 缓冲区大小
	props.put("buffer.memory", 33554432);
	props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	Producer<String, String> producer = new KafkaProducer<>(props);
	for (int i = 0; i < 100; i++) {
		producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
	}
	producer.close();
 }
}

2.带回调函数的 API
  回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
	public static void main(String[] args) throws ExecutionException,InterruptedException {
		Properties props = new Properties();
 		props.put("bootstrap.servers", "hadoop102:9092");//kafka 集群,broker-list
 		props.put("acks", "all");
 		props.put("retries", 1);//重试次数
 		props.put("batch.size", 16384);//批次大小
 		props.put("linger.ms", 1);//等待时间
 		props.put("buffer.memory", 33554432);//RecordAccumulator 缓冲区大小
		props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
		Producer<String, String> producer = new KafkaProducer<>(props);
		for (int i = 0; i < 100; i++) {
 			producer.send(new ProducerRecord<String, String>("first",
			Integer.toString(i), Integer.toString(i)), new Callback() {
 				//回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
				@Override
 				public void onCompletion(RecordMetadata metadata,
				Exception exception) {
 					if (exception == null) {
						System.out.println("success->" + metadata.offset());
 				} else {
 						exception.printStackTrace();
 				}
 			}
 		});
 	}
	producer.close();
 }
}
同步发送 API

  同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。
  由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方发即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
	public static void main(String[] args) throws ExecutionException, InterruptedException {
		Properties props = new Properties();
		props.put("bootstrap.servers", "hadoop102:9092");//kafka 集群,broker-list
		props.put("acks", "all");
		props.put("retries", 1);//重试次数
 		props.put("batch.size", 16384);//批次大小
 		props.put("linger.ms", 1);//等待时间
		props.put("buffer.memory", 33554432);//RecordAccumulator 缓冲区大小
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		Producer<String, String> producer = new KafkaProducer<>(props);
		for (int i = 0; i < 100; i++) {
		producer.send(new ProducerRecord<String, String>("first", Integer.toString(i),Integer.toString(i))).get();
		}
		producer.close();
    }
}

Consumer API

  Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
  由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
  所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。

自动提交 offset

1)导入依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

2)编写代码
需要用到的类:
KafkaConsumer: 需要创建一个消费者对象,用来消费数据
ConsumerConfig: 获取所需的一系列配置参数
ConsuemrRecord: 每条数据都要封装成一个 ConsumerRecord 对象
为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。
自动提交 offset 的相关参数:
enable.auto.commit: 是否开启自动提交 offset 功能
auto.commit.interval.ms: 自动提交 offset 的时间间隔
以下为自动提交 offset 的代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumer {
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put("bootstrap.servers", "hadoop102:9092");
		props.put("group.id", "test");
 		props.put("enable.auto.commit", "true");
 		props.put("auto.commit.interval.ms", "1000");
		props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer",	"org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);		
		consumer.subscribe(Arrays.asList("first"));
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records)
			System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
			}
	}
}
手动提交 offset

  虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。
  手动提交 offset 的方法有两种:分别是 commitSync(同步提交)commitAsync(异步提交) 。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
1)同步提交 offset
  由于同步提交 offset 有失败重试机制,故更加可靠,以下为同步提交 offset 的示例。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomComsumer {
	public static void main(String[] args) {
 		Properties props = new Properties();
		//Kafka 集群
 		props.put("bootstrap.servers", "hadoop102:9092");
		//消费者组,只要 group.id 相同,就属于同一个消费者组
 		props.put("group.id", "test");
 		props.put("enable.auto.commit", "false");//关闭自动提交 offset
		props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("first"));//消费者订阅主题
 		while (true) {
			//消费者拉取数据
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records) {
 				System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(),record.value());
			 }
			//同步提交,当前线程会阻塞直到 offset 提交成功
 			consumer.commitSync();
 		}
 	}
}

2)异步提交 offset
  虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
  以下为异步提交 offset 的示例:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class CustomConsumer {
	public static void main(String[] args) {
		Properties props = new Properties();
		//Kafka 集群
 		props.put("bootstrap.servers", "hadoop102:9092");
 		//消费者组,只要 group.id 相同,就属于同一个消费者组
 		props.put("group.id", "test");
		//关闭自动提交 offset
 		props.put("enable.auto.commit", "false");
		props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("first"));//消费者订阅主题
 		while (true) {
 		ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据
		for (ConsumerRecord<String, String> record : records) {
 			System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(),record.value());
 		}

		//异步提交
		consumer.commitAsync(new OffsetCommitCallback() {
 		@Override
 		public void onComplete(Map<TopicPartition,OffsetAndMetadata> offsets, Exception exception){
			if (exception != null) {
 				System.err.println("Commit failed for" + offsets);
 				}
 			}
 		});
 	}
	}
}

3) 数据漏消费和重复消费分析
  无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。

自定义存储 offset

  Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在Kafka的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。
  offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。
  当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
  消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
  要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class CustomConsumer {
 private static Map<TopicPartition, Long> currentOffset = new
HashMap<>();
public static void main(String[] args) {
//创建配置信息
 Properties props = new Properties();
//Kafka 集群
 props.put("bootstrap.servers", "hadoop102:9092");
//消费者组,只要 group.id 相同,就属于同一个消费者组
 props.put("group.id", "test");
//关闭自动提交 offset
 props.put("enable.auto.commit", "false");
 //Key 和 Value 的反序列化类
 props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
 //创建一个消费者
 KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props);
 //消费者订阅主题
 consumer.subscribe(Arrays.asList("first"), new
ConsumerRebalanceListener() {

 //该方法会在 Rebalance 之前调用
 @Override
 public void
onPartitionsRevoked(Collection<TopicPartition> partitions) {
 commitOffset(currentOffset);
 }
 //该方法会在 Rebalance 之后调用
 @Override
 public void
onPartitionsAssigned(Collection<TopicPartition> partitions) {
 currentOffset.clear();
 for (TopicPartition partition : partitions) {
 consumer.seek(partition, getOffset(partition));//
定位到最近提交的 offset 位置继续消费
 }
 }
 });
 while (true) {
 ConsumerRecords<String, String> records =
consumer.poll(100);//消费者拉取数据
 for (ConsumerRecord<String, String> record : records) {
 System.out.printf("offset = %d, key = %s, value
= %s%n", record.offset(), record.key(), record.value());
 currentOffset.put(new TopicPartition(record.topic(),
record.partition()), record.offset());
 }
 commitOffset(currentOffset);//异步提交
 }
 }
 //获取某分区的最新 offset
 private static long getOffset(TopicPartition partition) {
 return 0;
 }
 //提交该消费者所有分区的 offset
 private static void commitOffset(Map<TopicPartition, Long>
currentOffset) {
 }
}

拦截器原理

  Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
  对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs)
  获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
  该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata, Exception):
  该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。 并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer的消息发送效率。
(4)close:
  关闭 interceptor,主要用于执行一些资源清理工作
  如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

面试问题

1. 什么是 Apache Kafka?
答:Kafka 是一个分布式的流处理平台,支持高吞吐量、低延迟的消息发布和订阅系统。它可以持久化数据,并且具有高可用性和容错性,适用于实时数据流处理、日志聚合和事件驱动架构等场景。

2. Kafka 的主要组件有哪些?
答:Kafka 的主要组件包括:
• 生产者(Producer):发布消息到 Kafka 主题的客户端。
• 消费者(Consumer):订阅 Kafka 主题并处理消息的客户端。
• 主题(Topic):消息的分类和分发机制,主题中包含多个分区。
• 分区(Partition):主题被分为多个分区,分区是 Kafka 水平扩展和并行处理的基础。
• 副本(Replica):为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
• Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
• Follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。
• Broker:Kafka 集群中的每个节点,负责处理分区的数据存储和传输。
• Zookeeper:管理 Kafka 集群元数据,协调 broker 和其他服务(从 Kafka 2.8 开始逐步移除)。

3. Kafka 是如何保证消息的顺序性的?
答:Kafka 只在分区内保证消息的顺序性,生产者发送到同一分区的消息会按顺序处理。多个分区之间的消息顺序无法保证,因此如果消息的顺序对业务至关重要,需要确保相关的消息发送到同一个分区。

4. Kafka 是如何保证高可用的?
答:Kafka 通过分区副本机制来保证高可用性。每个分区有多个副本,一个为主副本(Leader),其余为从副本(Follower)。主副本负责读写操作,从副本同步主副本的数据。若主副本故障,从副本会自动提升为主副本,继续服务。

5. 生产者如何选择分区?
答:生产者在发送消息时可以指定分区,也可以通过分区策略来选择分区。常见策略包括:
• 轮询(Round-Robin):消息依次发送到不同的分区。
• 基于键的分区策略:通过消息的键计算哈希值,将消息发送到对应的分区。
• 自定义分区器:开发者可以实现自己的分区逻辑。

6. Kafka 中的消费者组是什么?
答:消费者组(Consumer Group)是一组消费者共同订阅一个或多个主题,并且每个消费者实例仅处理一部分分区的数据。Kafka 通过消费者组实现负载均衡,确保每个分区的数据只被一个消费者实例处理。

7. Kafka 中的消息是如何持久化的?
答:Kafka 消息被写入磁盘并以日志文件的形式存储,每个分区对应一个日志文件。Kafka 使用顺序 I/O 来写入和读取日志,极大提高了吞吐量。消息可以设置保存时间和保存大小,超过限制后,Kafka 会自动删除旧消息。

8. 什么是 Kafka 的偏移量(Offset)?
答:偏移量是 Kafka 中用来标识消息在分区中的位置的标志。每个消息都有唯一的偏移量,消费者通过偏移量来跟踪已消费的消息。消费者组会自动管理偏移量,也可以手动提交偏移量。

9. Kafka 的副本机制是如何工作的?
答:Kafka 每个分区有一个主副本(Leader)和多个从副本(Follower)。Leader 负责处理所有的读写请求,而 Follower 通过拉取 Leader 的数据保持同步。如果 Leader 故障,Kafka 会自动选举一个 Follower 作为新的 Leader。

10. 如何处理 Kafka 消费者的再平衡(Rebalance)?
答:当消费者组内的消费者实例增加或减少时,Kafka 会触发再平衡,重新分配分区给消费者实例。这一过程会导致短暂的停顿,因为消费者需要重新分配和获取分区的数据。为了减少再平衡的影响,可以优化消费者的处理逻辑,缩短消费的处理时间。

11. Kafka 是如何保证消息不丢失的?
答:Kafka 通过以下机制保证消息不丢失:
• ACK 机制:生产者可以配置不同的 ACK 级别来确保消息成功写入 Leader 或所有副本。
• 副本机制:每个分区的多个副本确保即使某个 broker 故障,数据依然可用。
• 持久化存储:消息写入 Kafka 后会立即存储到磁盘,防止因内存丢失消息。

12. Kafka 的 ISR(In-Sync Replica)是什么?
答:ISR(同步副本集)是保持与 Leader 同步的副本集合。只有 ISR 中的副本才可以被提升为新的 Leader。当 Follower 落后过多或失去连接时,会被移出 ISR 集合,Kafka 会优先保证消息写入 ISR 副本。

13. Kafka 如何处理消息的重复消费?
答:Kafka 本质上是“至少一次”传递机制,消费者可能会收到重复消息。因此,消费者需要实现幂等处理,确保重复消息不会导致业务错误。对于一些场景,可以通过事务机制确保消息的精确传递(Exactly Once)。

14. Kafka 中的事务是什么?
答:Kafka 2.0 引入了事务机制,允许生产者和消费者在多主题、多个分区中以原子方式提交或回滚消息。生产者可以通过事务确保“精确一次”语义,防止消息重复或丢失。

15. Kafka 中的 ZooKeeper 是做什么的?
答:Kafka 使用 ZooKeeper 来管理集群元数据,包括 broker 的注册、Leader 选举、主题分区元数据等。ZooKeeper 确保 Kafka 集群的高可用性和一致性。从 Kafka 2.8 开始,Kafka 支持移除对 ZooKeeper 的依赖,改用内部的元数据管理机制。

16. 如何监控 Kafka 集群的运行状况?
答:Kafka 提供了多种监控指标,如:
• Broker 相关指标:内存、CPU、磁盘使用率、请求处理时间等。
• Producer 相关指标:消息发送成功率、重试次数、发送延迟等。
• Consumer 相关指标:消息消费延迟、提交的偏移量等。

可以使用 JMX(Java Management Extensions) 导出这些指标,并通过工具如 Prometheus、Grafana、Kafka Manager 等进行可视化和报警。

17. Kafka 支持哪些序列化方式?
答:Kafka 支持多种消息的序列化方式,常用的包括:
• StringSerializer / StringDeserializer:用于处理字符串消息。
• ByteArraySerializer / ByteArrayDeserializer:处理字节数组。
• Avro、Protobuf:适合处理结构化数据,配合 Schema Registry 可以实现强类型的数据管理。

18. 什么是 Kafka Streams?
答:Kafka Streams 是 Kafka 的一个轻量级流处理库,用于构建实时流处理应用。它提供丰富的 DSL(领域特定语言)支持,允许开发者处理、转换、过滤、聚合来自 Kafka 的数据流。

19. Kafka 和传统的消息队列(如 RabbitMQ、ActiveMQ)有什么区别?
答:
• 设计目标:Kafka 的设计目标是处理高吞吐量、低延迟的大规模数据流,而 RabbitMQ 和 ActiveMQ 更适合处理低延迟、事务性要求高的小规模消息传递。
• 存储方式:Kafka 通过日志文件持久化消息,而 RabbitMQ 依赖内存和磁盘,消息会在消费后删除。
• 扩展性:Kafka 通过分区和副本轻松扩展,而传统消息队列在大规模场景下可能需要复杂的集群管理。

20. 如何在 Kafka 中处理“背压”(Backpressure)问题?
答:背压指的是生产者发送消息过快,消费者来不及处理的现象。处理背压的常见策略包括:

• 限流:生产者限制发送速率,确保消费者有足够时间处理。
• 批量处理:消费者可以批量消费和处理消息,减少每条消息的处理开销。
• 扩展消费者:通过增加更多消费者实例(或分区),提升并行处理能力。

21. Kafka 中的 ISR(InSync Replica)、OSR(OutSync Replica)、AR(All Replica) 代表什么?
答:
• ISR (In-Sync Replica):同步副本集合,即与 Leader 副本保持同步的副本集。只有 ISR 中的副本才能被选为新的 Leader。
• OSR (Out-Sync Replica):未能跟上 Leader 的副本集合。它们由于滞后过多或其他原因没有同步到最新状态。
• AR (All Replica):所有副本集合,包含 Leader、副本以及 ISR 和 OSR 中的副本。

22. Kafka 中的 HW、LEO 等分别代表什么?
答:
• HW (High Watermark):高水位标记,表示所有 ISR 副本都已经成功复制的最后一条消息的位置。消费者只能读取到 HW 之前的消息,以保证读取的数据不会丢失。
• LEO (Log End Offset):日志结束偏移量,表示当前分区日志中下一条将要写入消息的偏移量。每个副本都会维护自己的 LEO。

23. Kafka 中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
答:
• 分区器(Partitioner):用于决定消息应该发送到哪个分区。Kafka 提供了默认的分区器(如基于键的哈希分区策略),也可以实现自定义分区器。
• 序列化器(Serializer):将 Java 对象转换为字节数组,Kafka 使用字节数组来传输消息。生产者使用序列化器将消息内容转换为字节数据。
• 拦截器(Interceptor):在消息被发送到 Kafka 之前或之后,对消息进行预处理或监控的组件。

处理顺序为:拦截器 → 序列化器 → 分区器。

• 拦截器在消息发送前后可以对消息进行操作;
• 序列化器将对象转换为字节数组;
• 分区器决定消息发送到哪个分区。

24. Kafka 生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
答:
Kafka 生产者客户端的核心结构包括:

• Producer API:应用程序接口,用于向 Kafka 发送消息。
• Record Accumulator:消息缓冲区,负责临时存储待发送的消息,按分区划分。
• Sender:发送线程,负责将消息批量发送到 Kafka broker。
• Producer I/O Thread:用于处理 I/O 任务,如消息的序列化、发送请求等。

主要的线程有:

  1. 主线程:调用 send() 方法,将消息发送到 Record Accumulator。
  2. I/O 线程(Sender 线程):负责从 Record Accumulator 中拉取消息并发送到 broker。

25. “消费组中的消费者个数如果超过 topic 的分区,那么就会有消费者消费不到数据”这句话是否正确?
答:正确。在 Kafka 中,消费者组内的每个消费者实例负责消费特定分区的数据,每个分区只能被一个消费者消费。如果消费者数量超过了分区数量,那么多余的消费者将无法分配到分区,因此无法消费数据。例如,如果有 3 个分区而有 5 个消费者,那么 2 个消费者不会被分配到任何分区,自然就消费不到数据。

26. 消费者提交消费位移时提交的是当前消费到的最新消息的 offset 还是 offset+1?
答:提交的是 offset+1。Kafka 中消费者提交的位移是下一个要消费的消息的 offset,也就是当前消息消费完成后,提交的 offset 是下一条消息的 offset。例如,当前消费的是 offset 为 5 的消息,那么提交的 offset 会是 6。这样做的目的是防止重复消费同一条消息。

27. 有哪些情形会造成重复消费?
答:重复消费可能在以下几种情况下发生:
1. 消费者提交 offset 失败:消费者处理完消息后,offset 没有正确提交,导致消费者重启或再平衡时,重复消费已经处理过的消息。
2. 消费者处理完成但崩溃:在处理完消息但尚未提交 offset 的情况下,如果消费者崩溃或失去连接,Kafka 会在重启后再次消费同一条消息。
3. 启用自动提交 offset:如果 Kafka 消费者使用自动提交,但提交间隔过短,可能会在消息处理完之前就提交 offset,导致在重启时再次消费同一条消息。
4. 消息回滚:在使用事务或启用“至少一次”消费语义时,如果消息确认失败,可能会重新发送相同的消息,从而导致重复消费。

28. 那些情景会造成消息漏消费?
答:消息漏消费可能出现在以下几种情况:
1. 消费者手动提交 offset 提交错误:如果消费者手动提交的 offset 不正确,跳过了某些消息,则会导致漏消费。
2. 消费者在处理消息后崩溃并重启:如果 offset 提交成功但消息处理不完整,消费者在重启时可能认为已经消费了这些消息,而实际上并没有处理,导致漏消费。
3. 自动提交 offset 时间间隔过长:如果使用自动提交且提交间隔过长,消费者处理的部分消息的 offset 没有及时提交,在消费者崩溃或再平衡时可能会导致这些消息没有记录,被认为未消费。
4. 分区再平衡期间丢失消息:在分区再平衡的过程中,如果消费者在再平衡完成前崩溃,可能会丢失某些消息。

29. 当你使用 kafka-topics.sh 创建(删除)了一个 topic 之后,Kafka 背后会执行什么逻辑?
答:Kafka 的 topic 创建或删除操作会触发以下逻辑:
一、Zookeeper 节点更新:
• Kafka 使用 Zookeeper 存储集群的元数据。当创建(删除)一个 topic 时,Kafka 会在 Zookeeper 中的 /brokers/topics/ 节点下创建(删除)对应的 topic 元数据节点。例如:

/brokers/topics/first

二、触发 Controller 的监听程序:
• Kafka 集群中有一个 Controller 负责监听 Zookeeper 上的变化。当有新的 topic 创建或删除时,Controller 会被触发,并负责协调集群中的其他 broker。

三、Kafka Controller 负责 topic 创建:
• Kafka 的 Controller 负责 topic 创建的具体执行工作。它会通知各个 broker 进行分区副本的分配,并更新元数据缓存(metadata cache)。Controller 还会确保 topic 在集群中的分区和副本被正确分配,以便开始生产和消费数据。

30. Topic 的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?

答:可以增加。Kafka 支持动态增加分区数,因为增加分区不会影响已存在的数据。
增加分区的方法:
1. 使用 kafka-topics.sh 命令增加分区:
kafka-topics.sh --alter --topic <topic-name> --partitions <new-partition-count>
3. 使用 Kafka 的 Admin API 编程方式增加分区。
增加分区会导致数据在新的分区中均匀分布,但增加后可能破坏消息的顺序性,尤其是基于 key 的消息分区策略。

31. Topic 的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
答:不可以减少。Kafka 不支持减少分区的操作,原因如下:
1. 数据不兼容:每个分区都有独立的 offset,减少分区意味着需要将多个分区合并,这可能会造成数据丢失或混乱。
2. 有序性问题:Kafka 的消息分布是基于分区的,减少分区后消息的有序性可能会受到影响。
3. 设计限制:Kafka 在设计时不支持减少分区,这也是为了保持其简单性和高效性。

32. Kafka 有内部的 topic 吗?如果有是什么?有什么作用?
答:有。Kafka 有一些内部的 topic 用来处理集群的元数据和状态信息。
• __consumer_offsets:存储消费者组的 offset,用于跟踪每个消费者的消费进度。
• __transaction_state:存储 Kafka 事务的状态,保证事务消息的原子性和一致性。
• __cluster_metadata:保存集群的元数据,包括分区分配和 broker 信息。

这些内部 topic 用于维持 Kafka 集群的状态和操作。

33. Kafka 分区分配的概念?
答:Kafka 分区分配是指将 topic 的各个分区分配给消费者组中的消费者。分区分配策略决定了哪些消费者负责消费哪些分区。

常见的分区分配策略:
1. Range 分配策略:按顺序分配分区给消费者。如果分区数不能整除消费者数,则部分消费者会获得多个分区。
2. RoundRobin 分配策略:按循环方式将分区依次分配给消费者,尽量保证分配均匀。

Kafka 还支持自定义分区分配策略,可以通过实现 PartitionAssignor 接口定制化分配方式。

34. 简述 Kafka 的日志目录结构?
答:Kafka 的日志目录结构组织如下:
1. 每个 Kafka broker 节点在磁盘上为每个 topic 创建一个目录。
2. 每个 topic 的分区也有独立的目录,该目录下保存该分区的所有日志文件。
3. 分区日志按段(segment)存储,每个段包含 .log 文件和 .index 文件。日志文件名是该段起始偏移量,例如:00000000000000000000.log 表示从偏移量 0 开始的日志段。

35. 如果我指定了一个 offset,Kafka Controller 怎么查找到对应的消息?
答:Kafka 的 Controller 并不直接处理消息的查找。查找消息的过程由客户端和 broker 共同完成:
1. 客户端通过元数据查询获取分区的 leader broker。
2. 客户端向 leader broker 发送拉取请求,包含目标 offset。
3. leader broker 根据分区日志文件中的索引文件(.index 和 .timeindex)快速定位到对应的偏移量,并读取对应的日志段返回消息。

36. 聊一聊 Kafka Controller 的作用?
答:Kafka Controller 是 Kafka 集群中的关键角色,主要负责集群的管理和协调工作。其职责包括:
1. 管理分区的 leader 选举:当分区的 leader broker 失效时,Controller 负责在 ISR(同步副本)中选出新的 leader。
2. Topic 创建和删除:Controller 负责协调 broker 节点,在集群中创建或删除 topic。
3. 分区副本的管理:Controller 监控和管理各个分区的副本状态,确保数据的可靠性和可用性。
4. 集群元数据的管理:Controller 负责更新并向 broker 广播集群元数据,如分区的 leader 信息。

37. Kafka 中有哪些地方需要选举?这些地方的选举策略又有哪些?
答:Kafka 中主要有以下几种选举场景:
1. 分区 leader 选举:当分区的 leader broker 失效时,Kafka 会从 ISR 中选举新的 leader,优先选择同步副本中的第一个副本。
2. Controller 选举:Kafka 集群中只有一个 Controller,当现有 Controller 失效时,Kafka 通过 Zookeeper 选举新的 Controller。

选举策略:
• 优先副本选举:Kafka 优先从 ISR 中选择副本作为 leader,保证数据的完整性。
• Zookeeper 选举:Kafka 通过 Zookeeper 选举集群的 Controller 和维护元数据的一致性。

38. 失效副本是指什么?有哪些应对措施?
答:失效副本指的是 Kafka 中由于某些原因(如网络问题或 broker 故障)未能及时同步 leader 分区数据的副本。失效副本不在 ISR(In-Sync Replicas)中,无法参与 leader 选举。

应对措施:
1. 定期检查和修复:Kafka 通过监控副本同步的状态,自动剔除和恢复失效副本。
2. 重新分配副本:管理员可以手动触发分区副本的重新分配,恢复数据的冗余性。
3. 增加副本数量:增加分区的副本数量以提高容错能力,避免单个副本失效带来的数据丢失风险。

39. Kafka 的哪些设计让它有如此高的性能?
答:Kafka 的高性能得益于以下设计特点:
1. 顺序写磁盘:Kafka 将数据顺序写入磁盘日志,而不是随机写入,极大提高了磁盘的 I/O 效率。
2. 零拷贝:Kafka 使用 Linux 的 sendfile 系统调用,避免了内核与用户空间之间的数据拷贝,减少了 CPU 负担。
3. 批量处理:Kafka 将多个消息打包在一起进行处理,从而提高传输效率并减少网络开销。
4. 分区并行处理:Kafka 的分区模型允许消息的并行处理,多个消费者可以同时消费不同分区的数据,从而提高吞吐量。
5. 高效的压缩机制:Kafka 支持消息压缩(如 Gzip、LZ4),进一步减少数据的传输和存储成本。
6. 消费者拉模式:消费者主动拉取消息,避免了推送消息的负载不均问题。

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐