Kafka高级特性
可以通过实现 org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口来实现。
目录
注意:本文参考 《浅入浅出》-Kafka
师兄大厂面试遇到面试官的 Kafka 暴击三连问,快面哭了!
消息队列之推还是拉,RocketMQ 和 Kafka是如何做的?
Kafka总体
Kafka如何有序
想要保证消息(数据)是有序的,怎么做?
Kafka会将数据写到partition,单个partition的写入是有顺序的。如果要保证全局有序,那只能写入一个partition中。如果要消费也有序,消费者也只能有一个。
Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。
总结一下,对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:
1 1 个 Topic 只对应一个 Partition。
2 (推荐)发送消息的时候指定 key/Partition。
当然不仅仅只有上面两种方法,上面两种方法是我觉得比较好理解的,
消息队列模型知道吗?kafka是怎么做到支持这两种模型的?
对于传统的消息队列系统支持两个模型:
1 点对点:也就是消息只能被一个消费者消费,消费完后消息删除
2 发布订阅:相当于广播模式,消息可以被所有消费者消费
上面也说到过,kafka其实就是通过Consumer Group同时支持了这两个模型。
如果说所有消费者都属于一个Group,消息只能被同一个Group内的一个消费者消费,那就是点对点模式。
如果每个消费者都是一个单独的Group,那么就是发布订阅模式。
实际上,Kafka通过消费者分组的方式灵活的支持了这两个模型。
kafka通信过程原理
1 首先kafka broker启动的时候,会去向Zookeeper注册自己的ID(创建临时节点),这个ID可以配置也可以自动生成,同时会去订阅Zookeeper的brokers/ids
路径,当有新的broker加入或者退出时,可以得到当前所有broker信息
2 生产者启动的时候会指定bootstrap.servers
,通过指定的broker地址,Kafka就会和这些broker创建TCP连接(通常我们不用配置所有的broker服务器地址,否则kafka会和配置的所有broker都建立TCP连接)
3 随便连接到任何一台broker之后,然后再发送请求获取元数据信息(包含有哪些主题、主题都有哪些分区、分区有哪些副本,分区的Leader副本等信息)
4 接着就会创建和所有broker的TCP连接
5 之后就是发送消息的过程
6 消费者和生产者一样,也会指定bootstrap.servers
属性,然后选择一台broker创建TCP连接,发送请求找到协调者所在的broker
7 然后再和协调者broker创建TCP连接,获取元数据
8 根据分区Leader节点所在的broker节点,和这些broker分别创建连接
9 最后开始消费消息
为什么需要分区?有什么好处?
这个问题很简单,如果说不分区的话,我们发消息写数据都只能保存到一个节点上,这样的话就算这个服务器节点性能再好最终也支撑不住。
实际上分布式系统都面临这个问题,要么收到消息之后进行数据切分,要么提前切分,kafka正是选择了前者,通过分区可以把数据均匀地分布到不同的节点。
分区带来了负载均衡和横向扩展的能力。
发送消息时可以根据分区的数量落在不同的Kafka服务器节点上,提升了并发写消息的性能,消费消息的时候又和消费者绑定了关系,可以从不同节点的不同分区消费消息,提高了读消息的能力。
另外一个就是分区又引入了副本,冗余的副本保证了Kafka的高可用和高持久性。
Kafka 的负责均衡会有什么问题呢?
kafka的负载均衡在绝对理想的状况下可以实现,但是会有某些情况出现一定程度上的负载不均衡
1.broker 端分配不均:当创建 topic 的时候可能会出现某些 broker 分配到的分区数多,而有些 broker 分配的分区少,这就导致了 leader 多副本不均。
2.生产者写入消息不均:生产者可能只对某些 broker 中的 leader 副本进行大量的写入操作,而对其他的 leader 副本不闻不问。
3.消费者消费不均:消费者可能只对某些 broker 中的 leader 副本进行大量的拉取操作,而对其他的 leader 副本不闻不问。
4.leader 副本切换不均:当主从副本切换或者分区副本进行了重分配后,可能会导致各个 broker 中的 leader 副本分配不均匀。
如何保证消息可靠性?
消息可靠性的保证基本上我们都要从3个方面来阐述(这样才比较全面,无懈可击)
生产者发送消息丢失
kafka支持3种方式发送消息,这也是常规的3种方式,发送后不管结果、同步发送、异步发送,基本上所有的消息队列都是这样玩的。
1 发送并忘记,直接调用发送send方法,不管结果,虽然可以开启自动重试,但是肯定会有消息丢失的可能
2 同步发送,同步发送返回Future对象,我们可以知道发送结果,然后进行处理
3 异步发送,发送消息,同时指定一个回调函数,根据结果进行相应的处理
为了保险起见,一般我们都会使用异步发送带有回调的方式进行发送消息,再设置参数为发送消息失败不停地重试。
acks=all
,这个参数有可以配置0|1|all。
0 表示生产者写入消息不管服务器的响应,可能消息还在网络缓冲区,服务器根本没有收到消息,当然会丢失消息。
1 表示至少有一个副本收到消息才认为成功,一个副本那肯定就是集群的Leader副本了,但是如果刚好Leader副本所在的节点挂了,Follower没有同步这条消息,消息仍然丢失了。
配置all的话表示所有ISR都写入成功才算成功,那除非所有ISR里的副本全挂了,消息才会丢失。
为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3
。这样就可以保证每个 分区(partition) 至少有 3 个副本,以确保消息队列的安全性。
retries=N
,设置一个非常大的值,可以让生产者发送消息失败后不停重试
kafka自身消息丢失
kafka因为消息写入是通过PageCache异步写入磁盘的,因此仍然存在丢失消息的可能。
因此针对kafka自身丢失的可能设置参数:
replication.factor=N
,设置一个比较大的值,保证至少有2个或者以上的副本。
min.insync.replicas=N
,代表消息如何才能被认为是写入成功,设置大于1的数,保证至少写入1个或者以上的副本才算写入消息成功。
unclean.leader.election.enable=false
,这个设置意味着没有完全同步的分区副本不能成为Leader副本,如果是true
的话,那些没有完全同步Leader的副本成为Leader之后,就会有消息丢失的风险。
消费者消息丢失
消费者丢失的可能就比较简单,关闭自动提交位移即可,改为业务处理成功手动提交。
因为重平衡发生的时候,消费者会去读取上一次提交的偏移量,自动提交默认是每5秒一次,这会导致重复消费或者丢失消息。
enable.auto.commit=false
,设置为手动提交。
还有一个参数我们可能也需要考虑进去的:
auto.offset.reset=earliest
,这个参数代表没有偏移量可以提交或者broker上不存在偏移量的时候,消费者如何处理。earliest
代表从分区的开始位置读取,可能会重复读取消息,但是不会丢失,消费方一般我们肯定要自己保证幂等,另外一种latest
表示从分区末尾读取,那就会有概率丢失消息。
综合这几个参数设置,我们就能保证消息不会丢失,保证了可靠性。
Zookeeper 在 Kafka 中的作用知道吗?
要想搞懂 zookeeper 在 Kafka 中的作用 一定要自己搭建一个 Kafka 环境然后自己进 zookeeper 去看一下有哪些文件夹和 Kafka 有关,每个节点又保存了什么信息。 一定不要光看不实践,这样学来的也终会忘记!这部分内容参考和借鉴了这篇文章
https://www.jianshu.com/p/a036405f989c
下图就是我的本地 Zookeeper ,它成功和我本地的 Kafka 关联上(以下文件夹结构借助 idea 插件 Zookeeper tool 实现)
ZooKeeper 主要为 Kafka 提供元数据的管理的功能。
从图中我们可以看出,Zookeeper 主要为 Kafka 做了下面这些事情:
1 Broker 注册 :在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到 /brokers/ids
下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去
2 Topic 注册 : 在 Kafka 中,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:/brokers/topics/my-topic/Partitions/0
、/brokers/topics/my-topic/Partitions/1
3 负载均衡 :上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
4 offset存储:消费者对应的分区的offset存在里面
新版本Kafka为什么抛弃了Zookeeper
我认为可以从两个个方面来回答这个问题:
首先,从运维的复杂度来看,Kafka本身是一个分布式系统,他的运维就已经很复杂了,那除此之外,还需要重度依赖另外一个ZK,这对成本和复杂度来说都是一个很大的工作量。
其次,应该是考虑到性能方面的问题,比如之前的提交位移的操作都是保存在ZK里面的,但是ZK实际上不适合这种高频的读写更新操作,这样的话会严重影响ZK集群的性能,这一方面后来新版本中Kafka也把提交和保存位移用消息的方式来处理了。
另外Kafka严重依赖ZK来实现元数据的管理和集群的协调工作,如果集群规模庞大,主题和分区数量很多,会导致ZK集群的元数据过多,集群压力过大,直接影响到很多Watch的延时或者丢失。
Kafka 支持读写分离吗?为什么?
Kafka 是不支持读写分离的,那么读写分离的好处是什么?主要就是让一个节点去承担另一个节点的负载压力,也就是能做到一定程度的负载均衡,而且 Kafka 不通过读写分离也可以一定程度上去实现负载均衡。
但是对于 Kafka 的架构来说,读写分离有两个很大的缺点
1.数据不一致的问题:读写分离必然涉及到数据的同步,只要是不同节点之间的数据同步,必然会有数据不一致的问题存在。
2.延时问题:由于 Kafka 独特的数据处理方式,导致如果将数据从一个节点同步到另一个节点必然会经过主节点磁盘和从节点磁盘,对一些延时性要求较高的应用来说,并不太适用
kafka 控制器是什么?有什么作用
在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器,它负责管理整个集群中所有分区和副本的状态,kafka 集群中只能有一个控制器。
当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。
当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。
当为某个 topic 增加分区数量时,由控制器负责分区的重新分配。
kafka 控制器是怎么进行选举的?
kafka 中的控制器选举工作依赖于 Zookeeper,成功竞选成为控制器的 broker 会在Zookeeper中创建/controller临时节点。
每个 broker 启动的时候会去尝试读取/controller 节点的 brokerid的值
如果读取到的 brokerid 的值不为-1,表示已经有其他broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;
如果Zookeeper中不存在/controller 节点,或者这个节点的数据异常,那么就会尝试去创建/controller 节点,创建成功的那个 broker 就会成为控制器。
每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为 activeControllerId。
Zookeeper 中还有一个与控制器有关的/controller_epoch 节点,这个节点是持久节点,节点中存放的是一个整型的 controller_epoch 值。controller_epoch 值用于记录控制器发生变更的次数。
controller_epoch 的初始值为1,即集群中的第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。
每个和控制器交互的请求都会携带 controller_epoch 这个字段,
如果请求的 controller_epoch 值小于内存中的 controller_epoch值,则认为这个请求是向已经过期的控制器发送的请求,那么这个请求会被认定为无效的请求。
如果请求的 controller_epoch 值大于内存中的 controller_epoch值,那么说明已经有新的控制器当选了
Kafka生产者
生产者发送消息模式
总共有三种模式
1.发后即忘(fire-and-forget)
它只管往 Kafka 里面发送消息,但是不关心消息是否正确到达,这种方式的效率最高,但是可靠性也最差,比如当发生某些不可充实异常的时候会造成消息的丢失
2.同步(sync)
producer.send()返回一个Future对象,调用get()方法变回进行同步等待,就知道消息是否发送成功,发送一条消息需要等上个消息发送成功后才可以继续发送
3.异步(async)
Kafka支持 producer.send() 传入一个回调函数,消息不管成功或者失败都会调用这个回调函数,这样就算是异步发送,我们也知道消息的发送情况,然后再回调函数中选择记录日志还是重试都取决于调用方
发送消息时如何选择分区
1.轮询:依次将消息发送该topic下的所有分区,如果在创建消息的时候 key 为 null,Kafka 默认采用这种策略。
2.key 指定分区:在创建消息是 key 不为空,并且使用默认分区器,Kafka 会将 key 进行 hash,然后根据hash值映射到指定的分区上。这样的好处是 key 相同的消息会在一个分区下,Kafka 并不能保证全局有序,但是在每个分区下的消息是有序的,按照顺序存储,按照顺序消费。在保证同一个 key 的消息是有序的,这样基本能满足消息的顺序性的需求。但是如果 partation 数量发生变化,那就很难保证 key 与分区之间的映射关系了。
很常见的场景就是我们希望下单、支付消息有顺序,这样以订单ID作为key发送消息就达到了分区有序性的目的。
3.自定义策略:实现 Partitioner 接口就能自定义分区策略。
4.指定 Partiton 发送
Kafka服务器
Kafka如何持久化
数据写到消息队列,可能会存在数据丢失问题,数据在消息队列需要持久化(磁盘?数据库?Redis?分布式文件系统?)
Kafka会将partition以消息日志的方式(落磁盘)存储起来,通过 顺序访问IO和缓存(等到一定的量或时间)才真正把数据写到磁盘上,来提高速度。
Kafka Replicas如何管理的
AR:分区中的所有 Replica 统称为 AR
ISR:所有与 Leader 副本保持一定程度同步的Replica(包括 Leader 副本在内)组成 ISR
OSR:与 Leader 副本同步滞后过多的 Replica 组成了 OSR
Leader 负责维护和跟踪 ISR 集合中所有 Follower 副本的滞后状态,当 Follower 副本落后过多时,就会将其放入 OSR 集合,当 Follower 副本追上了 Leader 的进度时,就会将其放入 ISR 集合。
默认情况下,只有 ISR 中的副本才有资格晋升为 Leader。
日志标志位和副本同步原理
分区相当于一个日志文件,我们先简单介绍几个概念
如上图是一个分区日志文件
标识共有7条消息,offset (消息偏移量)分别是0~6
0 代表这个日志文件的开始
HW(High Watermark) 为4,0~3 代表这个日志文件可以消费的区间,消费者只能消费到这四条消息
LEO 代表即将要写入消息的偏移量 offset
分区 ISR 集合中的每个副本都会维护自己的 LEO,而 ISR 集合中最小的LEO 即为分区的 HW
如上图: 三个分区副本都是 ISR集合当中的,最小的 LEO 为 3,就代表分区的 HW 为3,所以当前分区只能消费到 0~2 之间的三条数据,如下图
Kafka副本的之前提到过,分为Leader副本和Follower副本,也就是主副本和从副本,和其他的比如Mysql不一样的是,Kafka中只有Leader副本会对外提供服务,Follower副本只是单纯地和Leader保持数据同步,作为数据冗余容灾的作用。
在Kafka中我们把所有副本的集合统称为AR(Assigned Replicas),和Leader副本保持同步的副本集合称为ISR(InSyncReplicas)。
ISR是一个动态的集合,维持这个集合会通过replica.lag.time.max.ms
参数来控制,这个代表落后Leader副本的最长时间,默认值10秒,所以只要Follower副本没有落后Leader副本超过10秒以上,就可以认为是和Leader同步的(简单可以认为就是同步时间差)。
副本间同步的过程依赖的就是HW和LEO的更新,以他们的值变化来演示副本同步消息的过程,绿色表示Leader副本,黄色表示Follower副本。
首先,生产者不停地向Leader写入数据,这时候Leader的LEO可能已经达到了10,但是HW依然是0,两个Follower向Leader请求同步数据,他们的值都是0。
然后,消息还在继续写入,Leader的LEO值又发生了变化,两个Follower也各自拉取到了自己的消息,于是更新自己的LEO值,但是这时候Leader的HW依然没有改变。
此时,Follower再次向Leader拉取数据,这时候Leader会更新自己的HW值,取Follower中的最小的LEO值来更新。
之后,Leader响应自己的HW给Follower,Follower更新自己的HW值,因为又拉取到了消息,所以再次更新LEO,流程以此类推。
分区再分配是做什么的?解决了什么问题?
分区再分配主要是用来维护 kafka 集群的负载均衡
既然是分区再分配,那么 kafka 分区有什么问题呢?
问题1:当集群中的一个节点下线了
如果该节点的分区是单副本的,那么分区将会变得不可用
如果是多副本的,就会进行 leader 选举,在其他机器上选举出新的 leader
kafka 并不会将这些失效的分区迁移到其他可用的 broker 上,这样就会影响集群的负载均衡,甚至也会影响服务的可靠性和可用性
问题2:当集群新增 broker 时,只有新的主题分区会分配在该 broker 上,而老的主题分区不会分配在该 broker 上,就造成了老节点和新节点之间的负载不均衡。
为了解决该问题就出现了分区再分配,它可以在集群扩容,broker 失效的场景下进行分区迁移。
分区再分配的原理就是通化控制器给分区新增新的副本,然后通过网络把旧的副本数据复制到新的副本上,在复制完成后,将旧副本清除。 当然,为了不影响集群正常的性能,在此复制期间还会有一些列保证性能的操作,比如复制限流。
副本 leader 是怎么选举的?
当分区 leader 节点崩溃时,其中一个 follower 节点会成为新的 leader 节点,这样会导致集群的负载不均衡,从而影响服务的健壮性和稳定性。
如下:
Topic: test Partation:0 Leader:1 Replicas:1,2,0 Isr:1,2,0
Topic: test Partation:1 Leader:2 Replicas:2,0,1 Isr:2,0,1
Topic: test Partation:2 Leader:0 Replicas:0,1,2 Isr:0,1,2
我们可以看到
0 服务器有 1 个 leader
1 服务器有 1 个 leader
2 服务器有 1 个 leader
如果此时中间的节点重启
Topic: test Partation:0 Leader:1 Replicas:1,2,0 Isr:1,0,2
Topic: test Partation:1 Leader:0 Replicas:2,0,1 Isr:0,1,2
Topic: test Partation:2 Leader:0 Replicas:0,1,2 Isr:0,1,2
我们又可以看到:
0 服务器有 2 个 leader
1 服务器有 1 个 leader
2 服务器有 0 个 leader
我们会发现,原本 2 服务器有1个 leader,经过重启后 leader 都消失了,如此就负载不均衡了。
为了解决这种问题,就引入了优先副本的概念
优先副本就是说在 AR 集合中的第一个副本。比如分区 2 的 AR 为 0,1,2,那么分区 2 的优先副本就为0。理想情况下优先副本就是 leader 副本。优先副本选举就是促使优先副本成为 leader 副本,从而维护集群的负载均衡。
分区数越多越好吗?吞吐量就会越高吗?
一般类似于这种问题的答案,都是持否定态度的。
但是可以说,在一定条件下,分区数的数量是和吞吐量成正比的,分区数和性能也是成正比的。
那么为什么说超过了一定限度,就会对性能造成影响呢?原因如下:
1.客户端/服务器端需要使用的内存就越多
服务端在很多组件中都维护了分区级别的缓存,分区数越大,缓存成本也就越大。
消费端的消费线程数是和分区数挂钩的,分区数越大消费线程数也就越多,线程的开销成本也就越大
生产者发送消息有缓存的概念,会为每个分区缓存消息,当积累到一定程度或者时间时会将消息发送到分区,分区越多,这部分的缓存也就越大
2.文件句柄的开销
每个 partition 都会对应磁盘文件系统的一个目录。在 Kafka 的数据日志文件目录中,每个日志数据段都会分配两个文件,一个索引文件和一个数据文件。每个 broker 会为每个日志段文件打开一个 index 文件句柄和一个数据文件句柄。因此,随着 partition 的增多,所需要保持打开状态的文件句柄数也就越多,最终可能超过底层操作系统配置的文件句柄数量限制。
3.越多的分区可能增加端对端的延迟
Kafka 会将分区 HW 之前的消息暴露给消费者。分区越多则副本之间的同步数量就越多,在默认情况下,每个 broker 从其他 broker 节点进行数据副本复制时,该 broker 节点只会为此工作分配一个线程,该线程需要完成该 broker 所有 partition 数据的复制。
4.降低高可用性
分区再分配,会将数据复制到另一份副本当中,分区数量越多,那么恢复时间也就越长,而如果发生宕机的 broker 恰好是 controller 节点时:在这种情况下,新 leader 节点的选举过程在 controller 节点恢复到新的 broker 之前不会启动。controller 节点的错误恢复将会自动地进行,但是新的 controller 节点需要从 zookeeper 中读取每一个 partition 的元数据信息用于初始化数据。例如,假设一个Kafka 集群存在 10000个partition,从 zookeeper 中恢复元数据时每个 partition 大约花费 2 ms,则 controller 的恢复将会增加约 20 秒的不可用时间窗口。
Kafka删除数据
Kafka 是不会删除数据的,它会把所有的数据都保留下来,每个 消费者(Consumer)对每个 Topic 都有一个 Offset
用来表示读取到了第几条数据 。
一般情况下 Offset 由客户端 SDK 负责保存 ,会保存到 Zookeeper 里面 。关于存在硬盘中的消息,Kafka 也有它的解决方法,可以基于时间和 Partition 文件的大小,正常 Kafka 是默认七天的保存,也可以通过命令来修改,以 users topic 为例。
修改kafka 7天 默认保存周期
kafka-topics.sh --zookeeper 6 --alter --topic users --config retention.ms=100000
所以,为了避免磁盘被撑满的情况,Kakfa 提供了两种策略来删除数据:
「基于时间」 (默认七天)
「基于 Partition 文件大小」
Kafka消费者
Kafka与重复消费
kafka出现消息重复消费的原因:
1 服务端侧已经消费的数据没有成功提交 offset(根本原因)。
2 Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:
1 消费消息服务做幂等校验,比如 Redis 的set、MySQL 的主键等天然的幂等功能。这种方法最有效。
2 将 enable.auto.commit
参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交offset合适?
处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
如何增强消费者的消费能力?
1.可以考虑增加 topic 的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。
2.如果是消费者消费不及时,可以采用多线程的方式进行消费,并且优化业务方法流程,同样的分区数,为什么人家并发那么高,你的就不行??
消费者组和消费者重平衡
Kafka中的消费者组订阅topic主题的消息,一般来说消费者的数量最好要和所有主题分区的数量保持一致最好(举例子用一个主题,实际上当然是可以订阅多个主题)。
当消费者数量小于分区数量的时候,那么必然会有一个消费者消费多个分区的消息。
而消费者数量超过分区的数量的时候,那么必然会有消费者没有分区可以消费。
所以,消费者组的好处一方面在上面说到过,可以支持多种消息模型,另外的话根据消费者和分区的消费关系,支撑横向扩容伸缩。
当我们知道消费者如何消费分区的时候,就显然会有一个问题出现了,消费者消费的分区是怎么分配的,有先加入的消费者时候怎么办?
旧版本的重平衡过程主要通过ZK监听器的方式来触发,每个消费者客户端自己去执行分区分配算法。
新版本则是通过协调者来完成,每一次新的消费者加入都会发送请求给协调者去获取分区的分配,这个分区分配的算法逻辑由协调者来完成。
而重平衡Rebalance就是指的有新消费者加入的情况,比如刚开始我们只有消费者A在消费消息,过了一段时间消费者B和C加入了,这时候分区就需要重新分配,这就是重平衡,也可以叫做再平衡,但是重平衡的过程和我们的GC时候STW很像,会导致整个消费群组停止工作,重平衡期间都无法消息消息。
另外,发生重平衡并不是只有这一种情况,因为消费者和分区总数是存在绑定关系的,上面也说了,消费者数量最好和所有主题的分区总数一样。
那只要消费者数量、主题数量(比如用的正则订阅的主题)、分区数量任何一个发生改变,都会触发重平衡。
下面说说重平衡的过程。
重平衡的机制依赖消费者和协调者之间的心跳来维持,消费者会有一个独立的线程去定时发送心跳给协调者,这个可以通过参数heartbeat.interval.ms
来控制发送心跳的间隔时间。
1 每个消费者第一次加入组的时候都会向协调者发送JoinGroup
请求,第一个发送这个请求的消费者会成为“群主”,协调者会返回组成员列表给群主
2 群主执行分区分配策略,然后把分配结果通过SyncGroup
请求发送给协调者,协调者收到分区分配结果
3 其他组内成员也向协调者发送SyncGroup
,协调者把每个消费者的分区分配分别响应给他们
消费者分区分配策略
主要有4种分配策略:
Range
默认的策略。
该分配策略是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后分区按照跨度来进行平均分配,尽可能保证分区均匀的分配给所有的消费者。
对于每个 topic,该策略会讲消费者组内所有订阅这个主题的消费者按照名称的字典顺序排序,然后为每个消费者划分固定过的区域,如果不够平均分配,那么字典排序靠前的就会多分配一个分区。
比如有3个分区,消费者A排序更靠前,所以能够分配到P0\P1两个分区,消费者B就只能分配到一个P2。
如果是4个分区的话,那么他们会刚好都是分配到2个。
但是这个分配策略会有点小问题,他是根据主题进行分配,所以如果消费者组订阅了多个主题,那就有可能导致分区分配不均衡。
比如下图中两个主题的P0\P1都被分配给了A,这样A有4个分区,而B只有2个,如果这样的主题数量越多,那么不均衡就越严重。
RoundRobin
也就是我们常说的轮询了,这个就比较简单了,不画图你也能很容易理解。
该分配策略是按将消费者组内所有消费者及消费者订阅的所有主题的分区按照字典排序,然后通过轮询的方式分配给每个消费者。
这个会根据所有的主题进行轮询分配,不会出现Range那种主题越多可能导致分区分配不均衡的问题。
P0->A,P1->B,P1->A。。。以此类推
Sticky
这种分配策略有两个目的
1.分区的分配要尽可能的均匀
2.分区的分配尽可能的与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标。
比如之前P0\P1分配给消费者A,那么下一次尽量还是分配给A。
这样的好处就是连接可以复用,要消费消息总是要和broker去连接的,如果能够保持上一次分配的分区的话,那么就不用频繁的销毁创建连接了。
假设消费组内有3个消费者:C0、C1、C2
它们都订阅了4个主题:t0、t1、t2、t3
并且每个主题有2个分区,也就是说整个消费组订阅了,t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1 这8个分区
最终的分配结果如下:
消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1
这样初看上去似乎与采用RoundRobinAssignor策略所分配的结果相同
此时假设消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用RoundRobinAssignor策略,那么此时的分配结果如下:
消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1
如分配结果所示,RoundRobinAssignor策略会按照消费者C0和C2进行重新轮询分配。而如果此时使用的是StickyAssignor策略,那么分配结果为:
消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t1p0、t2p1、t0p1、t3p1
可以看到分配结果中保留了上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的“负担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配还保持了均衡。
如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。
到目前为止所分析的都是消费者的订阅信息都是相同的情况,我们来看一下订阅信息不同的情况下的处理。
举例: 同样消费组内有3个消费者:C0、C1、C2
集群中有3个主题 t0、t1、t2
这3个主题分别有 1、2、3个分区
也就是说集群中有 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 这6个分区
消费者C0订阅了主题t0,消费者C1订阅了主题t0和t1,消费者C2订阅了主题t0、t1和t2
如果此时采用RoundRobinAssignor策略:
消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2
如果此时采用的是StickyAssignor策略:
消费者C0:t0p0
消费者C1:t1p0、t1p1
消费者C2:t2p0、t2p1、t2p2
此时消费者C0脱离了消费组,那么RoundRobinAssignor策略的分配结果为:
消费者C1:t0p0、t1p1
消费者C2:t1p0、t2p0、t2p1、t2p2
StickyAssignor策略,那么分配结果为:
消费者C1:t1p0、t1p1、t0p0
消费者C2:t2p0、t2p1、t2p2
可以看到StickyAssignor策略保留了消费者C1和C2中原有的5个分区的分配:
t1p0、t1p1、t2p0、t2p1、t2p2。
从结果上看StickyAssignor策略比另外两者分配策略而言显得更加的优异,这个策略的代码实现也是异常复杂。
4.自定义分区分配策略
可以通过实现 org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口来实现
消费是推还是拉
推拉模式
首先明确一下推拉模式到底是在讨论消息队列的哪一个步骤,一般而言我们在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互。
默认的认为 Producer 与 Broker 之间就是推的方式,即 Producer 将消息推送给 Broker,而不是 Broker 主动去拉取消息。
想象一下,如果需要 Broker 去拉取消息,那么 Producer 就必须在本地通过日志的形式保存消息来等待 Broker 的拉取,如果有很多生产者的话,那么消息的可靠性不仅仅靠 Broker 自身,还需要靠成百上千的 Producer。
Broker 还能靠多副本等机制来保证消息的存储可靠,而成百上千的 Producer 可靠性就有点难办了,所以默认的 Producer 都是推消息给 Broker。
所以说有些情况分布式好,而有些时候还是集中管理好。
推模式
推模式指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送。
我们来想一下推模式有什么好处?
消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer。
对于消费者使用来说更简单,简单啊就等着,反正有消息来了就会推过来。
推模式有什么缺点?
推送速率难以适应消费速率,推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过来啊。当推送速率过快就像 DDos 攻击一样消费者就傻了。
并且不同的消费者的消费速率还不一样,身为 Broker 很难平衡每个消费者的推送速率,如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 Broker ,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的状态进行推送速率的变更。
这其实就增加了 Broker 自身的复杂度。
所以说推模式难以根据消费者的状态控制推送速率,适用于消息量不大、消费能力强要求实时性高的情况下。
拉模式
拉模式指的是 Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer。
我们来想一下拉模式有什么好处?
拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。
拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。
拉模式可以更合适的进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息。而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。
拉模式有什么缺点?
消息延迟,毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率,比如隔个 2 秒请求一次,你看着消息就很有可能延迟 2 秒了。
消息忙请求,忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。
那到底是推还是拉
可以看到推模式和拉模式各有优缺点,到底该如何选择呢?
RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ。
我个人觉得拉模式更加的合适,因为现在的消息队列都有持久化消息的需求,也就是说本身它就有个存储功能,它的使命就是接受消息,保存好消息使得消费者可以消费消息即可。
而消费者各种各样,身为 Broker 不应该有依赖于消费者的倾向,我已经为你保存好消息了,你要就来拿好了。
虽说一般而言 Broker 不会成为瓶颈,因为消费端有业务消耗比较慢,但是 Broker 毕竟是一个中心点,能轻量就尽量轻量。
那么竟然 RocketMQ 和 Kafka 都选择了拉模式,它们就不怕拉模式的缺点么?怕,所以它们操作了一波,减轻了拉模式的缺点。
长轮询
RocketMQ 和 Kafka 都是利用“长轮询”来实现拉模式,我们就来看看它们是如何操作的。
为了简单化,下面我把消息不满足本次拉取的条数啊、总大小啊等等都统一描述成还没有消息,反正都是不满足条件。
Kafka 中的长轮询
像 Kafka 在拉请求中有参数,可以使得消费者请求在 “长轮询” 中阻塞等待。
简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去请求消息,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。
并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回。
我们来简单的看一下源码,为了突出重点,我会删减一些代码。
先来看消费者端的代码。
上面那个 poll 接口想必大家都很熟悉,其实从注解直接就知道了确实是等待数据的到来或者超时,我们再简单的往下看。
我们再来看下最终 client.poll 调用的是什么。
最后调用的就是 Kafka 包装过的 selector,而最终会调用 Java nio 的 select(timeout)。
现在消费者端的代码已经清晰了,我们再来看看 Broker 如何做的。
Broker 处理所有请求的入口其实我在之前的文章介绍过,就在 KafkaApis.scala 文件的 handle 方法下,这次的主角就是 handleFetchRequest 。
这个方法进来,我截取最重要的部分。
下面的图片就是 fetchMessages 方法内部实现,源码给的注释已经很清晰了,大家放大图片看下即可。
这个炼狱名字取得很有趣,简单的说就是利用我之前文章提到的时间轮,来执行定时任务,例如这里是delayedFetchPurgatory
,专门用来处理延迟拉取操作。
我们先简单想一下,这个延迟操作都需要实现哪些方法,首先构建的延迟操作需要有检查机制,来查看消息是否已经到了,然后呢还得有个消息到了之后该执行的方法,还需要有执行完毕之后该干啥的方法,当然还得有个超时之后得干啥的方法。
这几个方法其实对应的就是代码里的 DelayedFetch ,这个类继承了 DelayedOperation 内部有:
isCompleted 检查条件是否满足的方法
tryComplete 条件满足之后执行的方法
onComplete 执行完毕之后调用的方法
onExpiration 过期之后需要执行的方法
判断是否过期就是由时间轮来推动判断的,但是总不能等过期的时候再去看消息到了没吧?
这里 Kafka 和 RocketMQ 的机制一样,也会在消息写入的时候提醒这些延迟请求消息来了,具体代码我不贴了, 在 ReplicaManager#appendRecords 方法内部再深入个两方法可以看到。
不过虽说代码不贴,图还是要画一下的。
更多推荐
所有评论(0)