1 Kafka Overview

Apache Kafka is an open-source distributed event streaming platform.

A Kafka cluster consists of one or more brokers. A Kafka broker can be elected as the controller in the process known as Kafka Controller Election. Kafka Controller Election process relies heavily on the features of Apache ZooKeeper that acts as the source of truth and guarantees that only one broker can ever be elected.

Partition Leader Election is a process of electing a broker to be the leader of a partition. Use kafka-leader-election.sh utility for preferred or unclean leader election (if unclean.leader.election.enable is set to true (enabled), an out-of-sync (OSR) replica will be elected as leader when there is no live in-sync replica (ISR))

2 The functions of zk

2.1 Choose Controller

When the broker starts, it competes to create a temporary “/controller”. If someone create success, it becomes the Controller and write something to this node, which contains broker’s id and other information. At the same time, the data changes of “/controller” will be monitored throughout the process. If the old Controller hangs up, a new round of competition will be started.

2.2 Register Broker

Assuming a Broker with an id of 0, when it starts, it will create a temporary node “/brokers/ids/0” and write information such as ports. The Controller will monitor the node changes of “/brokers/ids” to sense the status of each broker in realtime and coordinate resources.

2.3 Coordinate topics operation

Coordinate the creation, adjustment and destruction of topics

2.3.1 Save topic partition information

For example create a topic named as mytopic. when it is created, its partition information is saved in “/brokers/topics/mytopic”. the Controller monitors the changes of the child nodes of “/brokers/topics” in the whole process, and perceives this information in realtime to complete the subsequent steps.

2.3.2 Save configuration information

Save configuration information in zookeeper such as topic and client

3 Kafka Producer

Each partition in the Kafka cluster has a leader and a set of replicas among the brokers. All writes to the partition must go through the partition leader. The replicas are kept in sync by fetching from the leader. When the leader shuts down or fails, the next leader is chosen from among the in-sync replicas (ISR). Depending on how the producer is configured, each produce request to the partition leader can be held until the replicas have successfully acknowledged the write. This gives the producer some control over message durability at some cost to overall throughput.

Messages written to the partition leader are not immediately readable by consumers regardless of the producer’s acknowledgement settings. When all in-sync replicas (ISR) have acknowledged the write, then the message is considered committed, which makes it available for reading. This ensures that messages cannot be lost by a broker failure after they have already been read. Note that this implies that messages which were acknowledged by the leader only (that is, acks=1) can be lost if the partition leader fails before the replicas have copied the message. Nevertheless, this is often a reasonable compromise in practice to ensure durability in most cases while not impacting throughput too significantly.

  • LEO:(Log End Offset), which identifies the offset of the next message to be written in the current log file.
  • HW: (High Watermark) commonly known as high water mark, it identifies a specific message offset (offset), and consumers can only pull messages before this offset. The smallest LEO in ISR as HW.

3.1 ACK

Kafka’s ack mechanism refers to the producer’s message sending confirmation mechanism.The default value of ack is 1.

  • ack=1: the producer considers the push message successful as long as it receives a notification that a partition copy has been successfully written.
  • ack=0: The producer sends it once and will not send it again, regardless of whether the sending is successful or not.
  • ack=-1: The producer only considers the push message to be successful when it receives notification of successful writing of all replicas in the partition.

4 Kafka Consumer

A consumer group is a set of consumers that jointly consume messages from one or multiple Kafka topics. The leader of a group is a consumer that is additionally responsible for the partition assignment in a consumer group.

A consumer group is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as rebalancing the group.

Coordinator is responsible for managing the members of the group as well as their partition assignments. The coordinator of each group is chosen from the leaders of the internal offsets topic __consumer_offsets, which is used to store committed offsets. Basically the group’s ID is hashed to one of the partitions for this topic and the leader of that partition is selected as the coordinator. In this way, management of consumer groups is divided roughly equally across all the brokers in the cluster, which allows the number of groups to scale by increasing the number of brokers.

When the consumer starts up, it finds the coordinator for its group and sends a request to join the group. The coordinator then begins a group rebalance so that the new member is assigned its fair share of the group’s partitions. Every rebalance results in a new generation of the group.

Each member in the group must send heartbeats to the coordinator in order to remain a member of the group. If no hearbeat is received before expiration of the configured session timeout, then the coordinator will kick the member out of the group and reassign its partitions to another member.

4.1 Offset Management

After the consumer receives its assignment from the coordinator, it must determine the initial position for each assigned partition. When the group is first created, before any messages have been consumed, the position is set according to a configurable offset reset policy (auto.offset.reset). Typically, consumption starts either at the earliest offset or the latest offset.

As a consumer in the group reads messages from the partitions assigned by the coordinator, it must commit the offsets corresponding to the messages it has read. If the consumer crashes or is shut down, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.

The offset commit policy is crucial to providing the message delivery guarantees needed by your application. By default, the consumer is configured to use an automatic commit policy, which triggers a commit on a periodic interval. The consumer also supports a commit API which can be used for manual offset management. Correct offset management is crucial because it affects delivery semantics.

Each call to the commit API results in an offset commit request being sent to the broker. Using the synchronous API, the consumer is blocked until that request returns successfully. This may reduce overall throughput since the consumer might otherwise be able to process records while that commit is pending.

4.2 Kafka Group Coordinator

The answer is simple. A group coordinator oversees all of this. So, one of the Kafka broker gets elected as a Group Coordinator. When a consumer wants to join a group, it sends a request to the coordinator. The first consumer to participate in a group becomes a leader. All other consumers joining later becomes the members of the group.

So, we have two actors, A coordinator, and a group leader. The coordinator is responsible for managing a list of group members. So, every time a new member joins the group, or an existing member leaves the group, the coordinator modifies the list.

On an event of membership change, the coordinator realizes that it is time to rebalance the partition assignment. Because you may have a new member, and you need to assign it some partitions, or a member left, and you need to reassign those partitions to someone else, So, every time the list is modified, the coordinator initiates a rebalance activity.
How does the consumer group determine its own coordinator?

  1. Calculate the Partition corresponding to the Group on __consumer_offsets
partition-Id(__consumer_offsets) = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
  • Find the Broker corresponding to the leader of the Partition according to the corresponding Partition. The Group Coordinator on the Broker is the Coordinator of the Group

4.3 Kafka Group Leader

The group leader is responsible for executing rebalance activity. The group leader will take a list of current members, assign partitions to them and send it back to the coordinator. The Coordinator then communicates back to the members about their new partitions. The important thing to note here is, during the rebalance activity, none of the consumers are allowed to read any message.

5 Split brain problem

The different instruction whill be send by the two centers would cause system chaos. how to solve it? here we choose controller epoch which is a increment integer. the means are as follows.

  • All brokers monitor “/controller”, if the node is deleted, a new round of election will be started, and if the node changes, a new epoch will be obtained
  • The Controller will register the SessionExpiredListener. Once the Session fails such as due to network problems, it will automatically lose the Controller identity and participate in the election again.
  • Receive a request from the Controller, if its epoch is less than the currently known controller_epoch, it will be rejected directly

6 Rebalance

Rebalance is a protocol that distribute topic partitions to consumers. for example: There are 20 consumer instances under a Group, which subscribe to a Topic with 100 partitions. Normally, Kafka allocates 5 partitions on average to each Consumer. This allocation process is Rebalance.

6.1 Trigger rebalance

  • The number of group members changes. For example, a new consumer instance joins the consumer group or leaves the group.
  • The number of subscribed topics has changed.
  • The number of partitions subscribed to the topic has changed.

During the rebalance process, all consumer instances under the consumer group will stop working and wait for the rebalance process to complete.

6.2 Rebalance process

The Rebalance process is divided into two steps: Join and Sync.

6.2.1 Join

In this step, all members send a JoinGroup request to the coordinator, requesting to join the consumer group. Once all members have sent the JoinGroup request, the coordinator will select a consumer to act as the leader, and send the group member information and subscription information to the leader. The leader is responsible for formulating the consumption allocation plan.
Join Scenes

6.2.2 Sync

In this step, the leader begins to allocate consumption plans, that is, which consumer is responsible for consuming which partitions of which topics. Once the allocation is completed, the leader will encapsulate the solution into a SyncGroup request and send it to the coordinator, and the non-leader will also send a SyncGroup request, but the content is empty. After receiving the allocation plan, the coordinator will insert the plan into the response of the SyncGroup and send it to each consumer. This way all members of the group know which partitions they should consume.
Sync Scenes

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐