接上文:《架构设计:系统间通信(28)——Kafka及场景应用(中1)

4-3、复制功能

我们在上文中已经讨论了Kafka使用分区的概念存储消息,一个topic可以有多个分区它们分布在整个Kafka集群的多个Broker服务节点中,并且一条消息只会按照消息生产者的要求进入topic的某一个分区。那么问题来了:如果某个分区中的消息在被消费端Pull之前,承载该分区的Broker服务节点就因为各种异常原因崩溃了,那么在这个Broker重新启动前,消费者就无法收到消息了。

为了解决这个问题,Apache Kafka在V 0.8+版本中加入了复制功能:让topic下的每一个分区存储到多个Broker服务节点上,并由Zookeeper统一管理它们的状态。

这里写图片描述

请注意Kafka中Partition(分区)和replication(复制)是两个完全不同的概念,很多读者容易将这两个概念混淆——虽然它们都和“如何存储消息”这件事情有关:前者是说将若干条消息按照一定的规则分别存放在不同的区域,一条消息只存入一个区域(且Topic下多个分区可以存在于同一个Broker上);后者是说,为了保证消息在被消费前不会丢失,需要将某一个区域中的消息集合复制出多个副本(同一个分区的多个副本不能存放在同一个Broker上)。

Kafka将分区的多个副本分为两种角色:Leader和Follower,Leader Broker是主要服务节点,消息只会从消息生产者发送给Leader Broker,消息消费者也只会从Leader Broker中Pull消息。Follower Broker为副本服务节点,正常情况下不会公布给生产者或者消费者直接进行操作。Follower Broker服务节点将会主动从Leader Broker上Pull消息。

在这种工作机制下,Follower和Leader的消息复制过程由于Follower服务节点的性能、压力、网络等原因,它们和Leader服务节点会有一个消息差异性。当这个差异性扩大到一定的范围,Leader节点就会认为这个Follower节点再也跟不上自己的节奏,导致的结果就是Leader节点会将这个Follower节点移出“待同步副本集”ISR(in-sync replicas),不再关注这个Follower节点的同步问题。

只有当ISR中所有分区副本全部完成了某一条消息的同步过程,这条消息才算真正完成了“记录”操作。只有这样的消息才会发送给消息消费者。至于这个真正完成“记录”操作的通知是否能返回给消息生产者,完全取决于消息生产者采用的acks模式(后文会讲到)。

现在我们可以回过头看看上文中4-1-3-5小节给出的“查看Topic状态”命令以及命令结果:

# 脚本命令范例
kafka-topics.sh --describe --zookeeper 192.168.61.139:2181 --topic my_topic2

# 显示的结果
Topic:my_topic2 PartitionCount:4        ReplicationFactor:2     Configs:
        Topic: my_topic2        Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: my_topic2        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: my_topic2        Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: my_topic2        Partition: 3    Leader: 1       Replicas: 1,2   Isr: 1,2

以上命令行用于显示指定topic名称的基本状态信息。Partition表示分区号,Replicas表示所有副本的所在位置的Broker.id信息,Isr表示当前状态正常可以进行消息复制的副本所在位置的Broker.id信息。

那么从命令结果来看,名叫“my_topic2”的topic一共有4个数据分区,每一个分区有两个副本。其中:0号分区的Leader Broker服务节点的id为2,0号分区的两个副本分别在id为2和id为1的Broker服务节点上,且id为2和id为1的Broker上的副本状态都是正常的;同理,1号分区的Leader Broker服务节点的id为1,1号分区的两个副本分别在id为2和id为1的Broker服务节点上,且id为2和id为1的Broker上的副本状态都是正常的。。。

4-4、Kafka原理:生产者

请注意之前我们给出的Kafka集群方案的示意图,在图中消息生产者并没有连接到zookeeper协调服务,而是直接和多个Kafka Server Brokers建立了连接。和其他种类的消息队列的设计不同,在整个Kafka方案中消息生产者(Producer)会有很多重要规则的决定权,例如:

  • 消费生产者(Producer)可以决定向指定的Topic的哪一个分区(Partition)发送消息。而不是由Broker来决定。

  • 消息生产者(Producer)可以决定消息达到Kafka Broker后,Producer对消息的一致性关注到什么样的级别,又或者根本不关心消息在Broker上的一致性问题。

  • 消息生产者(Producer)可以决定是以同步方式(sync)还是异步方式(aSync)向Broker Server List发送消息。

  • 在异步方式下,消费生产者(Producer)还可以决定以什么样的间隔(周期)向Broker Server List发送消息。

  • 随机选定Broker Server List中某一个服务节点,读取当前Topic下的分区和复制表信息,并保存在本地Pool中的工作也是由消息生产者(Producer)主动完成。

  • 另外,Kafka中的消息生产者没有类似ActiveMQ中那样的事务机制(可参见文章《架构设计:系统间通信(23)——提高ActiveMQ工作性能(中)》)。这样的设计和Kafka主要的业务场景有关——用来收集各种操作日志。这样的场景对消息的可靠性要求并不高:漏掉一两条日志并不影响后端大数据平台对日志数据的分析结果;而且这样的设计大量简化了Broker的设计结构:它不需要像ActiveMQ那样专门为达成传输但还未进行commit的消息专门创建存储区域“transaction store”,并在进行了commit或者rollback操作后进行标记。这种处理机制是Apache Kafka高效性能的又一种保障。

  • Kafka中的多个消息生产者(Producer)并不需要ZooKeeper服务中的任何信息为它们协调发送过程,因为没有什么可协调的。生产者唯一需要知道的Topic有多少个分区以及每个分区,分别存在于哪些Broker上的信息都是来源于对某一个Broker的直接查询。所以Kafka集群中只剩下了Broker和Consumer需要进行协调(这个问题会在后文中进行详细讨论)。

  • 这是分布式系统建设思想中一个重要的原则——不可滥用协调装置:完成同一件工作时,协调N个参与角色要比协调N-1个参与角色耗费更多的时间和性能;所以,只协调需要协调的角色,只通知需要通知的事件,只为协调过程存储必要的数据。我在后续的写作中,会专门为读者详细介绍Kafka中消息生产者的实现过程,这里面有很多设计思想可以在各位的实际工作中借鉴。

4-4-1、基本使用

下面的代码使用Kafka的Java Client API演示消息生产者的使用。这里我们使用的Kafka Java Client API的版本是V0.8.2.2,您可以直接引入Maven的官方库依赖即可:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.2</version>
</dependency>

以下是Kafka消息生产者的代码,之前我们已经通过Kafka的命令脚本创建了一个拥有4个分区的Topic——my_topic2:

package kafkaTQ;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * kafka消息生产者演示,
 * @author yinwenjie
 */
public class KafkaProducer {

    public static void main(String[] args) throws RuntimeException {
        Properties props = new Properties();
        // 指定kafka节点列表,不需要由zookeeper进行协调
        // 并且连接的目的也不是为了发送消息,而是为了在这些节点列表中选取一个,来获取topic的分区状况
        props.put("metadata.broker.list", "192.168.61.138:9092");
        // 使用这个属性可以指定“将消息送到topic的哪一个partition中”,如果业务规则比较复杂的话可以指定分区控制器
        // 不过开发者最好要清楚topic有多少个分区,这样才好进行多线程(负载均衡)发送
        //props.put("partitioner.class", "kafkaTQ.PartitionerController");
        // 可以通过这个参数控制是异步发送还是同步发送(默认为“同步”)
        //props.put("producer.type", "async");
        // 可以通过这个属性控制复制过程的一致性规则
        //props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);

        // 创建消费者
        Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);

        // 由于我们为topic创建了四个partition,所以将数据分别发往这四个分区
        for (Integer partitionIndex = 0; ; partitionIndex++) {  
            Date time = new Date();
            // 创建和发送消息,可以指定这条消息的key,producer根据这个key来决定这条消息发送到哪个parition中
            // 另外一个可以决定parition的方式是实现kafka.producer.Partitioner接口
            String messageContext_Value = "this message from producer 由producer指的partitionIndex:[" + partitionIndex % 4 + "]" + time.getTime();
            System.out.println(messageContext_Value);
            byte[] messageContext = messageContext_Value.getBytes();
            byte[] key = partitionIndex.toString().getBytes();

            // 这是消息对象,请注意第二个参数和第三个参数,下一小节将会进行详细介绍
            KeyedMessage<byte[], byte[]> message = new KeyedMessage<byte[], byte[]>("my_topic2", key , partitionIndex % 4 ,  messageContext);
            producer.send(message);

            // 休息0.5秒钟,循环发
            synchronized (KafkaProducer.class) { 
                try {
                    KafkaProducer.class.wait(500);
                } catch (InterruptedException e) {
                    e.printStackTrace(System.out);
                }
            }
        } 
    }
}

4-4-2、生产者指定分区

开发人员可以在消息生产者端指定发送的消息将要传送到Topic下的哪一个分区(partition),但前提条件是开发人员清楚这个Topic有多少个分区,否则开发人员就不知道怎么编写代码了。当然开发人员也可以完全忽略决定分区的规则,这时将由消费者端携带的一个默认规则决定。

开发人员可以有两种方式进行分区指定:第一种方法是以上代码片段中演示的那样,在创建消息对象KeyedMessage时,指定方法中partKey/key的值;另一种方式是重新实现kafka.producer.Partitioner接口,以便覆盖掉默认实现。

使用KeyedMessage类构造消息对象时,可以指定4个参数,他们分别是:topic名称、消息Key、分区Key和message消息内容。topic名称和message消息内容很容易理解,但是怎样理解消息Key和分区Key呢?以下是KeyedMessage类的源代码(Scala语言):

package kafka.producer

/**
 * A topic, key, and value.
 * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
 */
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
  if(topic == null)
    throw new IllegalArgumentException("Topic cannot be null.")

  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)

  def this(topic: String, key: K, message: V) = this(topic, key, key, message)

  def partitionKey = {
    if(partKey != null)
      partKey
    else if(hasKey)
      key
    else
      null  
  }

  def hasKey = key != null
}

KeyedMessage类的构造函数中有一个局部变量:partitionKey,在KeyedMessage类的首行注释中,对该变量进行了一个说明:

If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.

从源码中可以看出,partitionKey优先使用partKey作为分区依据,如果partKey没有被赋值,则使用key作为分区依据。所以在使用KeyedMessage类的构造函数时,partKey和key您只需要指定其中的一个就完全够了。

您还可以实现kafka.producer.Partitioner接口,并在创建消费者对象时进行指定,以便实现分区的指定(如果不进行指定,默认的实现类为“kafka.producer.DefaultPartitioner”)。代码片段如下:

package kafkaTQ;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class PartitionerController implements Partitioner {

    /**
     * 必须要有这个构造函数
     * @param vp
     */
    public PartitionerController(VerifiableProperties vp) {

    }

    /* (non-Javadoc)
     * @see kafka.producer.Partitioner#partition(java.lang.Object, int)
     */
    @Override
    public int partition(Object parKey, int partition) {
        /*
         * 在这里您可以根据自身的业务过程重新运算一个分区,并进行返回。
         */
        Integer parKeyValue = (Integer)parKey;
        return parKeyValue;
    }
}

需要实现的partition方法中,第一个参数是您在创建消息时所传递的partyKey(这是的partyKey不一定传入Integer),第二个参数是send方法根据自身内部机制决定的目标分区。

4-4-3、同步和异步发送

消息生产这还可以决定是以同步方式向Broker发送消息还是以异步方式向Broker发送消息。只需要使用生产者配置中的“producer.type”属性进行指定。当该属性值为“sync”时,表示使用同步发送的方式;当该属性值为“async”时,表示使用异步发送方式。

在异步发送方式下,开发人员调用send方法发送消息时,这个消息并不会立即被发送到topic指定的Leader partition所在的Broker,而是会存储在本地的一个缓冲区域(一定注意是客户端本地)。当缓冲区的状态满足最长等待时间或者最大数据量条数时,消息会以一个设置值批量发送给Broker。如下图所示:

这里写图片描述

缓存区的数据按照batch.num.messages设置的数值被一批一批的发送给目标Broker(默认为200条),如果消息的滞留时间超过了queue.buffering.max.ms设置的值(单位毫秒,默认值为5000)就算没有达到batch.num.messages的数值,消息也会被发送。

如果由于Broker的原因导致消息发送缓慢,这时在本地待发送消息缓存区中的消息就有可能达到
queue.buffering.max.messages设置的缓存区允许存储的最大消息数量,一旦达到这个数量消息生产者端再次调用send方法的时候,send方法所在线程就会被阻塞,直到缓存区有足够的空间能够放下新的数据为止。

4-4-4、强一致性复制和弱一致性复制

Kafka中的消息生产者还可以配置发送的消息在Broker端以哪种方式进行副本复制:强一致性复制还是弱一致性复制,又或者不关注消息的一致性。(在分布式系统中强一致性、弱一致性和最终一致性是一个非常关键的知识点,它们是CAP原则重要的实践,我将会在“存储”专题中进行对它们的定义和主流的实现方式进行讲解)

在Kafka的实现中,强一致性复制是指当Leader Partition收到消息后,将在所有Follower partition完成这条消息的复制后才认为消息处理成功,并向消息生产者返回ack信息;弱一致性复制是指当Leader partition收到消息后,只要Leader Broker自己完成了消息的存储就认为消息处理成立,并向消息生产者返回ack信息(复制过程随后由Broker节点自行完成);

您可以通过消息生产者配置中的“request.required.acks”属性来设置消息的复制性要求。在官方文档中,对于这个属性的解释是:

acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect . The offset given back for each record will always be set to -1.

当acks设置为0时,生产者端不会等待Server Broker回执任何的ACK确认信息。只是将要发送的消息交给网络层。这种情况下,消息是否真的到达了Server Broker,实际上生产者端并不知道。由于生产者端并不等待Server Broker回执任何的ACK确认信息,那么消息一旦传输失败(例如,等待超时的情况)“重试”过程就无从谈起了。由于生产者端在这种情况下发送的消息,很可能Server Broker还没来得及处理,甚至更有可能Server Broker都没有接收到,所以Server Broker也无法告知生产者这条消息在分区中的偏移位置。

acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

当acks设置为1时,生产者发送消息将等待这个分区的Leader Server Broker 完成它本地的消息记录操作,但不会等待这个分区下其它Follower Server Brokers的操作。在这种情况下,虽然Leader Server Broker对消息的处理成功了,也返回了ACK信息给生产者端,但是在进行副本复制时,还是可能失败。

acks=all (#注:原文如此,实际上属性值为“-1”)This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.

当acks设置为“all”时,消息生产者发送消息时将会等待目标分区的Leader Server Broker以及所有的Follower Server Brokers全部处理完,才会得到ACK确认信息。这样的处理逻辑下牺牲了一部分性能,但是消息存储可靠性是最高的。

4-4-5、生产者需要查询zookeeper?

在2013年2月2日,Kafka的主要参与者Neha Narkhede发表了一篇讲解Kafka Replication过程的技术文档(算是官方文档)《Kafka Replication》,在这篇文档的Synchronous replication-write章节Neha Narkhede这样描述了“写”过程前的准备工作:

To publish a message to a partition, the client first finds the leader of the partition from Zookeeper and sends the message to the leader。

这句话的大意是:为了发送消息到一个分区,客户端首先要通过zookeeper查询到这个分区的Leader Broker在哪个位置,并且向这个Leader Broker发送信息。国内一些译文由此也形成了相关的中文描述。这显然与本文中提到的“生产者不需要连接zookeeper进行任何协调操作”的描述完全矛盾

4-4-5-1、进行实验

这里冲突的重点在于“生产者在发送消息时,是直接连接到了zookeeper服务查询相关信息,还是连接到某一个已知的Broker查询现信息?”

那么我们只能以实验的形式实际验证一下消息生产者在创建、发送消息的过程中是否需要连接zookeeper。实际上笔者通过阅读0.8.2.2版本的JAVA Client For Producer API 部分的的源码,真没有发现Producer直接连接zookeeper的证据(主要的类位置包括:kafka.producer.OldProducer、kafka.producer.ProducerPool、kafka.producer.SyncProducer、org.apache.kafka.clients.producer.internals.Sender和org.apache.kafka.clients.producer.KafkaProducer)。但是这显然不具有太大的说服力,毕竟很可能出现漏读代码的情况。

验证实验基于之前我们已经搭建的Apacke Kafka集群环境,192.168.61.140服务器上运行着一个standalone模式的zookeeper服务。在实验中,我们使用192.168.61.140服务器上自带的防火墙,设置只有两个Kafka Broker服务节点(139和138)能够访问zookeeper上的2181端口。并在这种情况下观察消息生产者的工作情况(以及相同topic下的消费者是能正常收到生产者发送的消息)。如下图所示:

这里写图片描述

  • 设置192.168.61.140上的防火墙,只允许192.168.61.139和192.168.61.138访问其2181端口:
[root@zk ~]# service iptables status
Table: filter
Chain INPUT (policy ACCEPT)
num  target     prot opt source               destination         
1    ACCEPT     tcp  --  192.168.61.138       0.0.0.0/0           tcp dpt:2181 
2    ACCEPT     tcp  --  192.168.61.139       0.0.0.0/0           tcp dpt:2181 
3    ACCEPT     icmp --  0.0.0.0/0            0.0.0.0/0           
4    REJECT     all  --  0.0.0.0/0            0.0.0.0/0           reject-with icmp-host-prohibited 

Chain FORWARD (policy ACCEPT)
num  target     prot opt source               destination         
1    REJECT     all  --  0.0.0.0/0            0.0.0.0/0           reject-with icmp-host-prohibited 

Chain OUTPUT (policy ACCEPT)
num  target     prot opt source               destination

在全端口开放ICMP协议,只是为了能够使用ping命令进行检查。

  • 接下来启动140上的zookeeper服务,并且验证一下在140分别开启和关闭防火墙的情况下,producer所在的服务节点是否能够连接到zookeeper
# 在140上启动zookeeper,并且确定它以standalone模式运行
[root@zk ~]# zkServer.sh start
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

[root@kp2 ~]# zkServer.sh status
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone

首先测试在140节点开启防火墙的情况下,producer所在的192.168.61.1服务节点是否能顺利连接到2181端口(使用telnet命令):

telnet 192.168.61.140 2181

Trying 192.168.61.140...
telnet: connect to address 192.168.61.140: Connection refused

然后关闭140上的防火墙,再使用同样的telnet命令进行测试:

telnet 192.168.61.140 2181

Trying 192.168.61.140...
Connected to 192.168.61.140.
Escape character is '^]'.

可以看到140启动防火墙后,192.168.61.1服务节点不能连接到140服务的2181端口。这说明我们设置的实验前提的确起到了限制192.168.61.1节点访问140节点上zookeeper服务的作用。

接下来我们重新开启140上防火墙,启动140上的zookeeper服务,启动139和138上的Kafka Broker服务,让整个Kafka Broker集群工作起来。正式开始进行实验:

// 生产者的测试代码就采用4-4-1小节中我们给出的代码样例。
// 很显然在140开启防火墙,producer无法连接zookeeper的情况下
// producer也能够正常工作。以下是producer程序打印的运行信息

this message from producer 由producer指的partitionIndex:[0]1462439421320
this message from producer 由producer指的partitionIndex:[1]1462439429482
this message from producer 由producer指的partitionIndex:[2]1462439437655
this message from producer 由producer指的partitionIndex:[3]1462439441987
  • 本编文章已经介绍过,消息能够发送出去不一定代表Kafka Broker集群工作正常。在消息复制、回执ACK等环节还是可能引起错误。只有消息消费者收到了消息,才能认为整个消息接受、处理、发送过程都是成功的

所以为了确认这些发送出去的消息能够被消费者接收到,在进行producer测试工作的同时,我们在138节点上,使用kafka-console-consumer运行了一个对应的消费者以便接收数据(不能在138节点或者139节点以外运行consumer,因为无法连接zookeeper服务节点的2181端口)。以下是消费者接收到的信息:

[root@activemq ~]# kafka-console-consumer.sh --zookeeper 192.168.61.140:2181 --topic my_topic2
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
this message from producer 由producer指的partitionIndex:[0]1462439421320
this message from producer 由producer指的partitionIndex:[1]1462439429482
this message from producer 由producer指的partitionIndex:[2]1462439437655
this message from producer 由producer指的partitionIndex:[3]1462439441987
4-4-5-2、结果分析

从以上小节的实验情况,我们看到的结果是:Producer所在的服务节点192.168.61.1,在不能访问192.168.61.140节点上zookeeper服务的情况下,包括Producer在内的整个Kafak集群能够正常工作,消费者端能够正常消费数据。那么问题来了,作为Kafka的主要参与者Neha Narkhede在这样的官方文档中是不太可能出现这样的低级错误的,那么是什么原因呢?当然如果真要说到出错,那么笔者自己出错的可能性倒是要高得多。笔者认为造成这种冲突的原因可能有以下几种:

  • 这篇文章是在2013年2月份发布的,那时候主流的Kafka版本是V0.7.X。但是笔者在实际工作中并没有使用任何V0.7.X版本,所以对V0.7.X版本中是否需要生产者连接zookeeper并没有确定的答案。

  • 在Neha Narkhede的这段话中,Client并不是指代的消息生产者,而是泛指的使用zookeeper服务的各种客户端角色。如果是这样的话,那么Client最有可能指代的就是Server Broker。

  • 以上实验中,并没有限制住producer直接访问zookeeper的所有情况。防火墙功能可能出现了问题,又或者producer自行通过一个隐藏的端口(例如:9999)访问到了zookeeper。

  • 笔者还没有考虑到的其它可能性。欢迎各位读者留言讨论。

========================================
(接下文)

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐