最近做项目时使用到了Kafka,翻自己博客居然没有相关文章,这时写点文章复习复习,顺便深入理解一下基本操作。

基本模型

Kafka是大数据常用的消息中间件,是由Apache开发的一种发布订阅消息系统,它是一个分布式的、分区的和重复的日志服务。它的模型大概就是这个样子

在这里插入图片描述
我们可以看到两个熟悉的角色
Producer生产者
Consumer消费者
这两个是消息中间件里面常见的两个名词
另外还有broker,它代表一台kafka服务器,多个broker组成集群。
从图中可以看到Producer负责向broker发送消息,而Consumer负责从broker拉取消息,从而完成整个消息的生产消费。另外还有一个Zookeeper,它是一个分布式的协调服务,主要保存了broker的一些元数据,还有负责broker服务协调,比如leader副本的选举等等。

分区和主题

主题Topic,一般不同业务就为kafka创建了不同Topic供生产者和消费者使用。
而一般情况下,一个主题为了能够均衡broker负载,一个Topic通常有不同的分区,每个分区有着自己的偏移量,但是这个分区和副本备份没有关系,最近很多人都迷惑这点,将分区和副本联系在一起,其实不是的。
当Producer生产消息时,会根据某些算法将消息划分到不同的分区,比如我来了一个A消息,它被分到了0分区,而B消息可能被分到了2分区,不同分区的消息是不冲突的,就像这样:
在这里插入图片描述
它是以日志追加的方式进行的,上面图中分区0正在写入偏移量为8的消息,分区1正在写入偏移量为6的消息…

你可能疑惑了,作为分布式的消息中间件,为什么连容灾的措施都没有。这不废话吗,当然有。看图说话

在这里插入图片描述
每个分区都会根据当初创建Topic的副本参数来复制分区。假如现在分区1,它有两个follower分别在Broker1和Broker2上。主要的是leader那个,在Broker1上,它负责读写,而副本只是负责从leader拉取消息,只负责同步消息,为了以后leader宕机能够及时切换减少消息的丢失。
但是呢,人写的程序不是万能的,follower多多少少都会有写滞后,”一定程度的滞后“将follower划分成了两种,一种是ISR,另一种是OSR

AS ISR OSR

AS:Assigned Replicas,即某一个分区的全部副本,包括了leader。
ISR:In-Sync Replicas,即与leader保持一定程度同步的follower。
OSR:Out-of-Sync Replicas,即对leader来说滞后了过多的follower。
所以AS=ISR+OSR
一个消息是首先发送到leader的,然后follower再去复制过来。而leader在负责读写时,还要注意维护follower的状态,如果一旦发现follower过多的之后,就会将它放到OSR中,而如果OSR中的进度慢慢跟上来了,就会将它放到ISR中。为什么要维护这样的follower集合,因为在leader出事了后,要重新选举leader的话,就会根据ISR中的follower来选举一个leader,这样能够尽量减少消息的丢失。

生产者

先来个初级版的例子吧
public class kafkaProducer {
//几台kafka服务器
    public static final String brokerList = "114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008";
    //topic主题名字
    public static final String topic = "topic-demo";

    public static Properties initConfig1(){
        Properties props = new Properties();
        //添加服务器列表配置
        props.put("bootstrap.servers",brokerList);
        //键的序列化方式
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
	//值的序列话方式
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //客户端名字
        props.put("client.id","producer.client.id.demo");
        return props;
    }  

    public void kafkatest1(){
        Properties properties = initConfig1();
        //创建生产者
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        //创建消息
        ProducerRecord<String,String> record = new ProducerRecord<>(topic,"hello,Kafka!");
        //fire-and-forget,同步sync,异步async
        producer.send(record);
    }
}
生产者是线程安全的,我们不用担心多线程情况下生产者发送消息会有影响。
有时候,如果你如果按照上面来敲代码,多多少少可能会敲错,所以我们最后尽量将其标准一点。像这样
public class kafkaProducer {
//几台kafka服务器
    public static final String brokerList = "114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008";
    //topic主题名字
    public static final String topic = "topic-demo";

    public static Properties initConfig2(){
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
        return props;
    }

    public void kafkatest1(){
        Properties properties = initConfig1();
        //创建生产者
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        //创建消息
        ProducerRecord<String,String> record = new ProducerRecord<>(topic,"hello,Kafka!");
        //fire-and-forget,同步sync,异步async
        producer.send(record);
    }
}
像上面的样子,我们就将这些”“里面的内容都用Kafka自己本来就有的常量了,所以不用担心搞错。
ProducerRecord

注意这个ProducerRecord,我们进去看看源码
在这里插入图片描述
注释写的很明显,ProducerRecord包括了主题,分区号,消息头部,键,值,消息时间戳。
键可以用来计算分区,值就是存储消息的。看到分区好partition其实我们也可以自己指定分区号,不过按默认的应该没事。

消息的发送

上面的图片就是消息的结构,以前的版本可能还要和Zookeeper联系起来,现在直接和Kafka集群通信即可。
对于ProducerRecord构造方法有很多种
在这里插入图片描述

看,明显可以自己指定分区,时间戳,键,值,还有消息头,都是可以自己弄得。而我们刚才看到的代码就只有topic和value(消息)。
发送消息的模式主要有三种,上面看过了,即fire-and-forget(发完马上忘掉),同步,异步

fire-and-forget:

这种方式可靠性最低,它从来不管消息有没有到达Kafka,只顾自己,是一种非常自私的方式,所以很容易发生消息丢失,因而可靠性最差,但是性能又是最好的。

同步:

这种方式就是阻塞等待Kafka服务器返回信息给自己。到send里面去看。
在这里插入图片描述
它返回了一个Future< RecordMetadata >对象,里面包括了各种元数据信息。我们实现同步的方式就是
在这里插入图片描述
这个get()方法可以阻塞,是Future< RecordMetadata >里的一个方法,所以如果你除了阻塞之外,你还可以利用这个对象获取到其他元数据信息。

异步发送

而异步方式就是常见的在send里面加一个回调函数
在这里插入图片描述
这样,一但返回了正确的信息就会用到回调函数,而且回调函数u能够保证分区有序。

拦截器

生产者拦截器既可以用来在消息发送前 做一些准备工作 比如按照 一个规则过 虑不符合要求
的消息、修改 息的内容等, 也可以用来在发送 调逻辑前做一 些定制化的需 求,比如统计
类工作。

public class producerinterce implements ProducerInterceptor<String,String> {
//    在序列化和分区之前会调用onSend方法来对消息进行相关定制化操作
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        //为每个消息添加一个前缀
        String modifiedValue = "pre" + record.value();
        return new ProducerRecord<>(record.topic(),record.partition(),record.timestamp(),record.key()
        ,modifiedValue,record.headers());
    }
// kafkaProducer在被应答之前或者消息发送失败时调用这个方法,比用户设置的CallBack先执行
//    这个方法运行在producer的IO线程中,所以这个方法实现的代码逻辑越简单越好。否则会影响消息发送速度。
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if(exception==null) System.out.println("success");
        else
            
    }
//关闭拦截器时执行一些资源清理工作
    @Override
    public void close() {

    }
//
    @Override
    public void configure(Map<String, ?> configs) {

    }
}
在producer中添加以下代码,这里还可以添加多个拦截器,当然一一按顺序执行。
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,producerInterceptor1.class.getName()+","+producerinterceptor2.class.getName());

序列化

我们在客户端Producer都会经过序列化器将我们发出去的消息序列化。最上面的入门例子,就用了StringSerializer去将对象转成字节数组。相反,在消费者那边就需要将消息反序列化了。

分区器

经过了序列化器后,就要经过分区器了。
刚才我们介绍过了,在ProducerRecord的构造方法种,可以自定义partition字段,也就是i自己分区,这样就不会经过默认的分区器了。但是我也不知道指定好还是不指定好,那么看看默认的分区器DefaultPartitioner的实现。
partition()方法中定义了主要的分区分配逻辑。如果我们ProducerRecord里面指定了key的话,那么就会根据key进行hash(MurMurHash2,它具有高性能计算和低碰撞率)算法,最终根据得到的哈希值计算分区号,拥有相同的key会发送到同一个分区,但是如果key没有设置,就会轮询进入每个分区。
当然也可以自定义分区器
在这里插入图片描述
这就是自定义分区器。当然默认的应该已经可以了。

Producer整体原理

在这里插入图片描述
看清楚了,kafka的producer有两个线程,主线程负责执行用户代码还有经过拦截器,序列化器,最后经过分区器,从而将ProducerRecord添加到缓存区,一个消息累加器里面,这个消息累加器并不是以ProducerRedcord为单位的,而是以ProducerBatch为单位,可以说一般情况下,一个ProducerBatch包括了几个ProducerRecord,并且也是按分区来放入ProducerRecord。
然后由Sender线程去发送。
从上图可以看到Sender线程和KafkaCluster之间是有通信的。那么肯定就会有消息的应答回复。

这个应答就叫做acks,它有好几种值,分别对应不同的情况
  • 等于1:这种情况是一种吞吐量和可靠性均衡的方案,只要消息被leader正确接收,不用等follower复制消息,可以直接返回给Producer正确响应,但是如果出错了,就会返回错误消息,比如leader坏了,或者正在重新选举leader都有可能。看起来这个挺不错的,但是呢还是会有消息丢失,因为它只管leader接收,假如leader正确接受了,返回正确信息给Producer,但是还没等follower复制完,就突然没了,那么follower就会确实消息,但是Producer不知道。
  • 等于0:这种情况是可靠性相当不好的,但是吞吐量很好,就是Producer不管KafkaCluster有没有正确接收到消息,反正Producer就是一直传,是非常自私的一种。
  • 等于-1或者ALL:这种情况就是可靠性最好的,就是当leader接收到消息后,会等所有的ISR中间的follower都复制完再给应答,可见可靠性非常好。很少有数据丢失,除非只有leader,而没有其他follower,那就和acks=1是一样的了,等于只有leader接收到了,如果leader挂了一样的没有消息。
    代码中这样配置
    在这里插入图片描述
    在这里插入图片描述

消费者

消费者很有意思,消费者有两种,一种是主动消费,一种是被动消费,被动消费就好比我们被搞传销的搞上了,那还能怎么办,强制消费呗,在生产者消费者领域是中间服务器强制发给消费者,是一种push的概念。而主动消费是自己资源消费,是一种pull模式,kafka就是这一种模式,当消费者要消费消息时,会主动从Broker拉取消息。
要了解kafka的消费者,首先要知道消费者和消费者组的概念

消费者和消费者组

很显然,消费者组包含了很多消费者。kafka在这种形式上抽象的也是蛮有意思的,我们知道消费方式可以是P2P,也可以是发布/订阅,即PUB/SUB,而kafka通过消费者和消费者组的概念就可以都实现。
对于消费者组,一个分区只能被一个消费者组中的一个消费者消费,反过来说,同一个消费者组的消费者不能重叠消费同一个分区。当只有一个消费者组时,一个分区就对应一个消费者,这样就是一种P2P的方式,而多个消费者组都只有一个消费者的话,那么所有消费者都可以收到消息,就是发布/订阅了。

在这里插入图片描述

看上面这个图,对于Group A,每个分区可以被对应的消费者消费,而Group B不得不一个消费者消费两个分区,挺可怜的。但是如果这样横向的不断增加同一消费者组的消费者,看似可以均衡消费,减少某一消费者的负载,但是并不是越多越好,每次新加一个消费者,分区就会重新分配,但是一旦同一消费者组的消费者数量多于分区数量,就会导致多余的消费者没有分区消费,就只能空在那里很尴尬,所以并不是消费者越多越好。

实验代码

代码的常用步骤

//配置消费者客户端参数及创建相应的消费者实例
//订阅主题
//拉取消息并且消费
//提交消费位移
//关闭消费者实例
public class KafkaConsumer {
//配置消费者客户端参数及创建相应的消费者实例
    //订阅主题
    //拉取消息并且消费
    //提交消费位移
    //关闭消费者实例
	
	//Kafka服务器的集群消息
    public static final String brokerList = "114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008";
    //主题消息
    public static final String topic = "topic-demo";
    //组名
    public static final String groupID = "group.demo"
    //这个待会儿就知道了    
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);


    public static Properties initConfig1(){
        Properties props = new Properties();
//键值序列化        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("bootstrap.servers",brokerList);
        props.put("group.id",groupID);
        props.put("client.id","consumer.client.id.demo");
        return props;
    }
    public static Properties initConfig2(){
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupID);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG,"client.id.demo");
        return props;
    }
    public static void test1(){
        Properties properties = initConfig2();
        //创建消费者对象
        org.apache.kafka.clients.consumer.KafkaConsumer<String,String> consumer =
                new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(properties);
        //订阅主题
        consumer.subscribe(Arrays.asList(topic));
	
        while(isRunning.get()){
        //通过poll方法来获取消息,这里我写了一篇文章poll里面发生了什么,还没写完,可以一看。[点开看看吧](https://blog.csdn.net/weixin_43272605/article/details/104168379)
            ConsumerRecords<String,String> records =
                    consumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord<String,String> record: records){
                System.out.println("topic = " + record.topic()
                +", partition = "+ record.partition()
                +", offset = " + record.offset());
                System.out.println("key = "+record.key()
                    + ", value = "+record.value());
            }
        }
        class TopicPartion implements Serializable{

        }
    }

}

订阅主题,分区

前面的代码可以看到,我们通过consumer.subscribe(Arrays.asList(topic));来订阅了主题
这个方法有好几个

public void subscribe(Collection<String> topics)     
//这个listener以后会知道的。是一种再均衡监听器。
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
//下面这两种可以采用正则表达式匹配多个主题
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)

//此外,还有另外一种,是订阅分区
public void assign(Collection<TopicPartition> partitions)
这个TopicPartition里面有两个变量,分别是topic和partition。比如这样用:
public void assign(Collection<TopicPartition> partitions)

//取消订阅
    public void unsubscribe()

反序列化

前面说了Producer在写入消息的时候会进行序列化,那么反之,消费者消费肯定要反序列化
任何反序列化器都要实现一个Deserializer接口,比如StringDeserializer就实现了这个接口
在这里插入图片描述

配置当前类
public void configure(Map<String, ?> co 口 gs boolean i sKey)
用来执行反序列化,这里将byte[]类型转成了String
public String deserialize(String topic, byte[] data)
用来关闭当前序列化器。
public void close () 

同时我们可以自定义反序列化器,来使用自己的。

消息的消费

实例的那份代码看到了,是通过poll方法实现消费,里面的东西我们可以看到的
我写了那篇文章
可以一看
public ConsumerRecords<K, V> poll(final Duration timeout)
在这里插入图片描述
像这样,就是每隔1000毫秒消费一次消息,期间可以等待应答。
返回的是ConsumerRecords里面的迭代是ConsumerRecord

	
    private final String topic;//主题
    private final int partition;//分区
    private final long offset;//位移
    private final long timestamp;//时间戳,包括了创建时间或者修改时间
    private final TimestampType timestampType;//创建或者修改时间
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Optional<Integer> leaderEpoch;

    private volatile Long checksum;

要知道,我们当初创建消费者时topic时可以指定多个的,同样,我们poll时是可以按照主题甚至是分区进行消费
按主题消费,就调用ConsumerRecords的records(topic)
在这里插入图片描述
按照partition消费就传个分区进去就行
在这里插入图片描述

位移的提交

这里首先我们要清楚,消费者总是要消费到一定的位移,不能重复消费吧。这个位移是kafka在维护着,我们客户端也需要自己提交更新位移。
而且我每次poll时,返回的一定是没有消费过的,那么这个位移更要去维护了,注意:这里的位移和消息在分区的位置数值一样,但是意义不一样,是指当前客户端消费当前分区的位置。
在以前的版本的话,消费位移是由zookeeper保存,但是现在是kafka集群专门在zookeeper有一个__consumer_offsets 来存储。而这个存储的过程就是提交位移。

假设对于某个分区消息0,1,2,3,4,5,6,7,8,9
假设我这次poll已经消费到了当前位置5,那么我提交的位移应该是6,即我下一次想要拉取的消息。

重复消费和消息丢失

我们提交位移的时候如果控制得不好,就会发生重复消费或者消息丢失。
对于消息0,1,2,3,4,5,6,7,8,9,·10,11
假设当前消费到了2,而拉取了3,4,5,6,7消息,然后提交位移8,但是在处理3,4,5,6,7消息时突然故障,那么等恢复时poll就会从8消费,这样3,4,5,6,7的消息处理就丢失了。
而如果在消费后再去提交,比如我先消费3,4,5,6,7,再去提交8,这时候如果消费时除了故障,就执行不到提交位移,这样恢复时下次poll又是拉取3,4,5,6,7就重复消费了。
一般情况下,我们都是按照默认的自动提交,默认是每隔5秒吧,不是很记得了。但是这样同样有问题,如果在下次提交前消费者发生故障,那么如果恢复了就会造成重复消费。但是如果卡在了提交的时间点,比如我已经拉取了消息,并且刚好在时间点提交,而消费的线程出了状况,导致某些消息没有处理,就出现消息丢失。其实总而言之就是最上面两种情况,提交位移先后的问题。
如果你不满意自动消费,可以自己手动提交,以前的手动提交是要连接zookeeper,但是我现在不是那个版本,所以不需要操作zookeeper

第一步:要开启手动提交功能,即关闭自动提交

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

第二步:代码手动提交

同步提交:
先通过poll获取消息
处理消息
通过consumer提交位移
consumer.commitSync();

异步提交
异步提交就是消费线程不会等提交返回结果就开始下一次拉取了。
在这里插入图片描述
有三个重载方法,第二个callback是一个传进去的回调方法,当返回结果时,可以进行相关处理。异步提交的方式引起重复消费就要注意了,不过第一次提交5,然后提交失败,但是呢紧接着要提交5+3,提交成功了,但是提交5那次还没有成功,又提交5成功就更新了offset,下次拉取又从5开始了。所以每次提交时最好在本地维护一个序号,这个序号一定是正确的消费位移,然后如果提交的时候位移小于这个序号,说明已经有更大的位移已经提交,就不需要提交了。

消费者再均衡

所谓消费者再均衡,就是防止某一个消费者负载过多,效率减低,像之前那样,一个主题的分区尽量能够均匀分到同一消费组的每一个消费者,这样可以提高效率,而现实往往不是理想的,所以kafka让消费者自己均衡,这个均衡的过程就叫做再均衡。这个过程中,消费者会停止读取消息和操作,所以这里就引发了一种重复消费的情况。就是如果消息处理完了,但是位移还没有提交,就发生了再均衡这样就均衡之后新的消费者还是从之前的偏移量开始读。所以就会引发重复消费,但是不要着急,我们有相当好的方法去解决这个问题,那就是订阅主题的时候开启再均衡回调。看实例代码

//可以再均衡回调
    public static void test3(){
        Properties properties = initConfig3();
        ***********************************************************************************
        //这个变量保存某个分区消费到的位移
        Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
        ***********************************************************************************
        
        org.apache.kafka.clients.consumer.KafkaConsumer<String,String> consumer =
                new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(properties);

***********************************************************************************
***********************************************************************************
***********************************************************************************
        //订阅主题
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            //这个方法可以在停止读取消息后,再均衡之前调用,可以在这里提交位移这样就可以避免重复消费
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                consumer.commitSync(currentOffsets);
                //回调时同步提交
                currentOffsets.clear();
            }
            //这个方法是重新分区之后,新消费者开始读取消息之前调用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

            }
        });
***********************************************************************************
***********************************************************************************
***********************************************************************************
        while(isRunning.get()){
            ConsumerRecords<String,String> records =
                    consumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord<String,String> record: records){
                currentOffsets.put(new TopicPartition(record.topic(),record.partition())
                ,new OffsetAndMetadata(record.offset()+1));
            }
            
//            异步提交位移
            consumer.commitAsync(currentOffsets,null);
        }

    }

使用这样,就可以保证再那个回调里面,使位移的提交能够改在再均衡之前,就可以避免重复消费了。

消费者也有拦截器

下面这个例子做了一个过期的拦截器,不过实用性好像不大。
public class consumerinterce implements ConsumerInterceptor<String,String> {
    private static final long EXPIRE_INTERVAl = 10* 1000;

    //在poll方法返回之前会调用这个方法,可以有相关消息的定制化操作
    //即对ConsumerRecords进行操作。
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long now = System.currentTimeMillis();
        //新的消费者记录
        Map<TopicPartition, List<ConsumerRecord<String,String>>> newRecords = new HashMap<>();
        //对于每一个分区
        for(TopicPartition tp : (records.partitions())){
            //每一个主题分区
            List<ConsumerRecord<String,String>> tpRecords = records.records(tp);
            //新的主题分区
            List<ConsumerRecord<String,String>> newTpRecords = new ArrayList<>();
            for(ConsumerRecord<String,String> record : tpRecords){
                if(now - record.timestamp() < EXPIRE_INTERVAl){
                    newTpRecords.add(record);
                }
            }
            if(!newTpRecords.isEmpty()){
                newRecords.put(tp,newTpRecords);
            }
        }
        return new ConsumerRecords<>(newRecords);
    }
    //提交完位移后调用这个方法
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐