Kafka介绍

Kafka 是由 LinkedIn 公司基于 Scala 语言开发的一个分布式消息系统,现已开源。Kafka 的标语已经从“一个高吞吐量,分布式的消息系统”改为“一个分布式流平台”。与传统的消息系统相比,Kafka 的特点包括:

  • 分布式:Kafka 是一个分布式系统,易于向外拓展。
  • 高吞吐,低延迟:能同时为生产者和消费者提供高吞吐。
  • 可扩展:Kafka 集群支持热扩展。
  • 持久性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
  • 容错性:允许集群中节点失败(若副本数量为 n,则允许失败的节点个数为 n-1)。

Kafka 之所以受到越来越多的青睐,与它所“扮演”的三大角色密不可分:

  • 消息系统:Kafka 和传统的消息系统(也称作消息中间件)具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
  • 存储系统:Kafka 将消息持久化到磁盘,相比于其他基于内存存储的系统,有效地降低了数据丢失的风险。得益于 Kafka 的消息持久化功能和多副本机制,可以将 Kafka 作为长期的数据存储系统,只需将数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
  • 流式处理平台:Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,如窗口、连接、变换和聚合等各类操作。

Kafka关键概念介绍

Topic-Partition模型

一个 topic 可以视为一类消息,每个 topic 被分成多个 partition,每个 partition 在存储层面是一个追加日志文件。任何发布到此 partition 的消息都会被追加到日志文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 是一个 long 型的数字,唯一标记一条消息。每条消息被 append 到 partition 中,是顺序写磁盘,因此效率非常高。

Partition消息存储

每条消息被发送到 broker 中,会根据 partition 规则选择被存储到哪一个 partition。如果 partition 规则设置合理,所有消息可以均匀分布到不同的 partition 里,实现水平扩展。在创建 topic 时,可以在 KAFKA_HOME/config/server.properties 中指定 partition 的数量,当然也可以在 topic 创建之后修改 partition 的数量,但 partition 的数量只能增加不能减少。

Producer

Producer

客户端调用 producer.send() 进行消息发送,有如下几个步骤:

  1. 和 Kafka 交互获取 broker 的元数据信息(往哪个 broker 发),获取不到则抛出异常。
  2. 判断消息包大小是否符合,默认为 1MB,如果超过该大小则抛异常,修改配置需要和 broker 配合修改。
  3. 将消息放入客户端内部的双端队列发送,队列默认大小为 32M,当大小不够时会阻塞,阻塞时间可配,当阻塞时间过后会抛出异常给客户端。
  4. 同时有一个 Sender 线程会实时批量拉取双端队列内消息往 Kafka 集群发送。如果发送成功,则进入回调函数,返回 RecordMetadata 对象。如果发送失败,则进行重试,重试次数可配,当重试次数用完,则进入客户端定义的回调函数由客户端处理(失败情况下,是否需要退出进程?)。

重要配置

  • ACKS_CONFIG:指定必须有多少个分区副本收到消息,生产者才会收到一个来自服务器的成功响应。值为 -1 表示所有副本都需收到消息,确保数据的最大安全性。
  • RETRIES_CONFIG:失败重试的次数,设置为 Integer.MAX_VALUE 表示无限重试。可以调整以应对临时性错误。
  • MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:限制客户端在单个连接上能够发送的未响应请求的个数。设置为 1 可以避免消息乱序。
  • MAX_BLOCK_MS_CONFIG:缓冲区已满或元数据不可用时的阻塞时间,超出时间则 send 调用会抛出异常。

Consumer

消费者,即接收消息的一方。消费者连接到 Kafka 上并接收消息。

消息订阅

Kafka 的订阅机制,最小粒度只到 partition。不支持 broker 端按照消息进行过滤。现有实现方案是针对不同的应用拉取所有消息,再进行过滤。消费者的订阅方式有两种:

  • subscribe:订阅指定的 topic 列表。
  • assign:指定 consumer 实例消费 topic 下的具体分区。assign 的 consumer 不会拥有 Kafka 的 group management 机制,也就是当 group 内消费者数量变化时不会有 rebalance 行为发生。assignsubscribe 方法不能同时使用。
// consumer 订阅的 topic 及 partition
topicPartition = new TopicPartition(this.topic, this.partitionId);
this.partitions = Arrays.asList(topicPartition);
consumer.assign(partitions);

消息拉取

Kafka 对外暴露了一个非常简洁的 poll 方法,其内部实现了协作、分区重平衡、心跳、数据拉取等功能。poll() 方法返回记录的列表,每条记录包含 key/value 以及主题、分区、位移信息。

消费者对象不是线程安全的,一个线程一个消费者。如果需要多个消费者,使用多线程进行一一对应。

位移提交

消费者自身来管理消费的位移,并向消费者提供更新位移的接口。可以通过 consumer 的配置设置自动提交,也可以手动提交。当前采用的是手动提交的方式,包括同步和异步两种,当前采用的是同步提交的方式。

消息批量拉取

Kafka 中消费是基于拉模式的。可通过传入超时时间 timeout 来控制 poll() 方法的阻塞时间。可以通过不同的配置指定消费者每次拉取的最大/最小的数据量(byte)或消息个数。

相关配置

  • session.timeout.ms:默认值 10000,用于决定消费超时失败的时间。如果在 session 超时之前,broker 没有收到心跳请求,broker 会移除该消费者并导致 rebalance。
  • enable.auto.commit:是否允许自动提交 offset,设置为 false,采用手动提交的方式。

Broker

简而言之,一个 Kafka 集群一般包括一个或多个 Kafka 服务实例,通常一个 Kafka 实例部署在一台服务器上。这台服务器被称为 Broker,即一个节点。

Topic

每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。Topic 是一个抽象概念(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 Broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。

Partition

Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition。为了实现扩展性,一个非常大的 Topic 可以分布到多个 Broker(即服务器)上。Kafka 只保证按一个 Partition 中的顺序将消息发给 Consumer,不保证一个 Topic 的整体(多个 Partition 间)的顺序。Kafka 采用顺序写的方式保证同一 Partition 内部数据有序性,但不能保证 Topic 级别的消息有序。

集群中的架构关系
在这里插入图片描述

Consumer Group

每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer)和单播(发给任意一个 Consumer)的手段。一个 Topic 可以有多个 Consumer Group。Topic 的消息会复制(不是真的复制,是概念上的)到所有的 Consumer Group,但每个 Consumer Group 只会把消息发给该 Consumer Group 中的一个 Consumer。如果需要实现广播,只要每个 Consumer 有一个独立的 Consumer Group 就可以了。要实现单播,只要所有的 Consumer 在同一个 Consumer Group 即可。使用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic。

高可靠的原理和配置

  • 副本机制

Kafka 的高可靠性保障来源于其健壮的副本(replication)机制。从 Kafka 0.8.x 版本开始,为分区引入了多副本机制,通过增加副本数量来提升数据容灾能力。同时,Kafka 通过多副本机制实现故障自动转移,在 Kafka 集群中某个 Broker 节点失效的情况下仍然保证服务可用。在分析副本机制之前,我们需要先了解几个基本概念。

  • AR, ISR, & OSR

分区中的所有副本统称为 AR(Assigned Replicas),所有与 Leader 副本保持一定程度同步的副本(包括 Leader 副本在内)组成 ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。消息会先发送到 Leader 副本,然后 Follower 副本再从 Leader 副本中拉取消息进行同步,同步期间内 Follower 副本相对于 Leader 副本会有一定程度的滞后(“一定程度”指的是可忍受的滞后范围,这个范围是可配置的,配置包括延迟时间 replica.lag.time.max.ms 和延迟条数 replica.lag.max.messages,0.10.x 中只支持 replica.lag.time.max.ms 这个配置)。与 Leader 副本同步滞后过多的副本(不包括 Leader 副本)组成 OSR(Out-of-Sync Replicas)。

AR = ISR + OSR

Leader 副本负责维护和跟踪 ISR 集合中所有 Follower 副本的滞后状态。当 Follower 副本落后太多或失效时,Leader 副本会把它从 ISR 集合中剔除。如果 OSR 集合中有 Follower 副本“追上了 Leader 副本”,那么 Leader 副本会把它从 OSR 集合转移至 ISR 集合。默认情况下,当 Leader 副本发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 Leader,而在 OSR 集合中的副本没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。

如果某一个 Partition 的所有 Replica 都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

  1. false:等待 ISR 中任意一个 Replica “活”过来,并且选它作为 Leader
  2. true(默认):选择第一个“活”过来的 Replica(并不一定是在 ISR 中)作为 Leader
    • 配置 unclean.leader.election.enable=false
  • HW & LEO

HW 是 High Watermark 的缩写,俗称高水位。它表示一个特定的偏移量(offset),消费者只能拉取到这个 offset 之前的消息。

在这里插入图片描述

如图所示,表示一个日志文件,这个日志文件中有 9 条消息,第一条消息的 offset(LogStartOffset)为 0,最后一条消息的 offset 为 8。offset 为 9 的消息用虚线框表示,代表下一条待写入的消息。日志文件的 HW 为 6,表示消费者只能拉取到 offset 在 0-5 之间的消息,而 offset 为 6 的消息对消费者而言是不可见的。

LEO 是 Log End Offset 的缩写。它表示当前日志文件中下一条待写入消息的 offset,如上图 offset 为 9 的位置,即为当前日志文件的 LEO。LEO 的大小相当于当前日志文件中最后一条消息的 offset 增加 1。分区 ISR 集合中的每一个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。

  • 复制原理和同步方式

Kafka 中 Topic 的每个 Partition 有一个预写式的日志文件,每个 Partition 都由一些列有序的、不可变的消息组成,这些消息被连续地追加到 Partition 中。为了提高消息的可靠性,Kafka 每个 Topic 的 Partition 有 N 个副本(replicas),其中 N(大于等于 1)是 Topic 的复制因子(replica factor)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中一个 Broker 失效时,仍然保证服务可用。在 Kafka 中发生复制时,确保 Partition 的日志能有序地写到其他节点上。N 个 Replicas 中,其中一个 Replica 为 Leader,其他都为 Follower。Leader 处理 Partition 的所有读写请求,与此同时,Follower 会被动定期地去复制 Leader 上的数据。

Kafka 提供了数据复制算法保证,如果 Leader 发生故障或挂掉,一个新 Leader 被选举并被接受客户端的消息成功写入。Kafka 确保从同步副本列表中选举一个副本为 Leader,或者说 Follower 追赶 Leader 数据。Leader 负责维护和跟踪 ISR 中所有 Follower 滞后的状态。当 Producer 发送一条消息到 Broker 后,Leader 写入消息并复制到所有 Follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的 Follower 限制,重要的是快速检测慢副本。如果 Follower “落后”太多或者失效,Leader 将会把它从 ISR 中删除。

每个 Replica 都有 HW,Leader 和 Follower 各自负责更新自己的 HW 的状态。对于 Leader 新写入的消息,Consumer 不能立刻消费。Leader 会等待该消息被所有 ISR 中的 Replicas 同步后更新 HW,此时消息才能被 Consumer 消费。这样就保证了如果 Leader 所在的 Broker 失效,该消息仍然可以从新选举的 Leader 中获取。对于来自内部 Broker 的读取请求,没有 HW 的限制。

下图详细说明了当 Producer 生产消息至 Broker 后,ISR 以及 HW 和 LEO 的流转过程:

在这里插入图片描述

由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。实际上,同步复制要求所有能工作的 Follower 都复制完,这条消息才会被 commit,这种复制方式极大地影响了吞吐率。而异步复制方式下,Follower 异步地从 Leader 复制数据,数据只要被 Leader 写入 log 就被认为已经 commit,这种情况下如果 Follower 都还没有复制完,落后于 Leader 时,突然 Leader 宕机,则会

丢失数据。而 Kafka 的这种使用 ISR 的方式则很好地均衡了确保数据不丢失以及吞吐率。

数据可靠性和持久化保证

当 Producer 向 Leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

  • 1(默认):这意味着 Producer 在 ISR 中的 Leader 已成功收到数据并得到确认。如果 Leader 宕机了,则会丢失数据。
  • 0:这意味着 Producer 无需等待来自 Broker 的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性最低。
  • -1/all:Producer 需要等待 ISR 中的所有 Follower 都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当 ISR 中只有 Leader 时,这样就变成了 acks=1 的情况。

如果要提高数据的可靠性,在设置 request.required.acks=-1 的同时,也要配合 min.insync.replicas 这个参数,这样才能发挥最大的功效。min.insync.replicas 这个参数设定 ISR 中的最小副本数是多少,默认值为 1,当且仅当 request.required.acks 参数设置为 -1 时,此参数才生效。如果 ISR 中的副本数少于 min.insync.replicas 配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required

下面分别分析一下 acks=1acks=-1 的两种情况:

  • request.required.acks=1

Producer 发送数据到 Leader,Leader 写本地日志成功,返回客户端成功;此时 ISR 中的副本还没有来得及拉取该消息,Leader 就宕机了,那么此次发送的消息就会丢失。

在这里插入图片描述

  • request.required.acks=-1

同步(Kafka 默认为同步,即 producer.type=sync)的发送模式,replication.factor >= 2min.insync.replicas >= 2 的情况下,不会丢失数据。有两种典型情况。acks=-1 的情况下(如无特殊说明,以下 acks 都表示为参数 request.required.acks):

  1. 数据发送到 Leader,ISR 的 Follower 全部完成数据同步后,Leader 此时挂掉,那么会选举出新的 Leader,数据不会丢失。

在这里插入图片描述

  1. 数据发送到 Leader 后,部分 ISR 的副本同步,Leader 此时挂掉。比如 Follower1 和 Follower2 都有可能变成新的 Leader,Producer 端会得到返回异常,Producer 端会重新发送数据,数据可能会重复。

在这里插入图片描述

配置

接下来需要修改 broker 的配置文件 $KAFKA-HOME/config/server.properties。主要关注以下几个配置参数即可:

Broker 的编号

如果集群中有多个 broker,则每一个 broker 的编号需要设置为不同:

broker.id = 0

Broker 对外提供的服务入口地址

listeners=PLAINTEXT://localhost:9092

存放消息日志文件的地址

log.dirs=../kafka-logs

Kafka 所需的 ZooKeeper 集群地址

zookeeper.connect=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002

日志清理策略

log.cleanup.policy=compact

日志数据保留时间

log.retention.hours=168

是否允许自动创建 topic

auto.create.topics.enable=false

如果是单机模式,修改完上述配置参数后就可以启动服务。如果是集群模式,只需要对单机模式的配置文件做相应的修改即可:确保集群中每个 broker 的 broker.id 配置参数的值不一样,以及 listeners 配置参数也需要修改为与 broker 对应的 IP 地址或域名,之后就可以各自启动服务。注意,在启动 Kafka 服务之前,同样需要确保 zookeeper.connect 参数所配置的 ZooKeeper 服务已经正确启动。

启动

启动 ZooKeeper

启动 Kafka

$ nohup ./bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

节点数量

为保证消息可靠性和高可用性,推荐部署 3 个或以上节点。

Kafka 集群运维

创建 Topic

bin/kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 3 --partitions 1 --config unclean.leader.election.enable=false --config min.insync.replicas=2 --topic name

创建生产者

bin/kafka-console-producer.sh --broker-list ip:9092 --topic topic-name

创建消费者

bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --from-beginning --topic topic-name

查看指定 Topic 明细

bin/kafka-topics.sh --describe --zookeeper ip:2181 --topic topic_name

删除 Topic

bin/kafka-topics.sh --delete --zookeeper ip:2181 --topic topic_name

修改 Topic

bin/kafka-topics.sh --alter --zookeeper ip:2181 --topic topic_name --partitions 3

查询单个 Broker 的 Offset

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ip:9092 --topic command_topic --time -1

参数解析

配置 Topic 副本数量

--replication-factor 3

配置 Topic Partitions 数量

--partitions 1

配置最小副本数

当且仅当 request.required.acks 参数设置为 -1 时,此参数才生效。如果 ISR 中的副本数少于 min.insync.replicas 配置的数量时,客户端会返回异常:

org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required
--config min.insync.replicas=2

副本丢失情况的处理

如果某一个 partition 的所有 replica 都挂了,就无法保证数据不丢失。这种情况下有两种可行的方案:

  1. false:等待 ISR 中任意一个 replica “活”过来,并且选它作为 leader。
  2. true(默认):选择第一个“活”过来的 replica(并不一定是在 ISR 中)作为 leader。
--config unclean.leader.election.enable=false
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐