一 Producer的API

1 消息发送流程

Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

kafkaProoducer消息发送流程:
在这里插入图片描述

主线程不负责消息传递的具体过程,以提高效率,main将消息封装成ProducerRecord,经过一系列处理分门别类地将消息放到RecordAccumulator(主线程和sender线程共享的资源池)中,sender根据RA中已经分好区的数据发送到topic。

2 实现步骤

导入依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>

发送者发送数据程序代码

package com.hike.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class Producer {
    public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
        //1 实例化kafka集群(创建对象)
        Properties properties = new Properties();
        //通过配置文件配置
        //properties.load(Producer.class.getClassLoader().getResourceAsStream("kafka.properties"));
        //通过程序代码设置
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
 properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("acks","all");
        properties.setProperty("bootstrap.servers","hadoop101:9092");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        //2 用集群对象发送数据
        for (int i = 0; i < 10; i++) {
            Future<RecordMetadata> future = producer.send(
                    //2.1 封装ProducerRecord
                    new ProducerRecord<String, String>(
                            "hello",
                            Integer.toString(i),
                            "Value" + i
                    ),
                    //2.2 回调函数
                    new Callback() {
                        //当sender收到服务器的ack之后,sender线程会调用onCompletion方法
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e == null) {
                                System.out.println(recordMetadata);
                            }
                        }
                    });
            //RecordMetadata recordMetadata = future.get(); //添加此语句,发送方式变为同步操作
            System.out.println("第" + i + "条发送成功");
        }

        //3 关闭资源
        producer.close();
    }
}

二 Consumer的API

1 自动提交offset

Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

所以offset的维护是Consumer消费数据必须考虑的问题。

消费者消费数据程序代码

package com.hike.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) throws IOException, InterruptedException {
        //1 新建一个consumer对象
        Properties properties = new Properties();
        properties.load(Consumer.class.getClassLoader().getResourceAsStream("consumer1.properties"));
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        //2 用这个对象接收消息
        //发布订阅模式接收消息,先订阅
        consumer.subscribe(Collections.singleton("hello"));
        while(true){
            //从订阅的话题中拉取数据
            ConsumerRecords<String, String> poll = consumer.poll(2000);
            if(poll.count() == 0){
                Thread.sleep(100);
            }
            //消费拉取到的数据
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println(record);
            }
        }
        
        //3 关闭资源
        //consumer.close();
    }
}

consumer1.properties文件

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
bootstrap.servers=hadoop101:9092
enable.auto.commit=true
group.id=test2
auto.offset.reset=earliest

2 手动提交offset

虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka提供了手动提交offset的API。

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

修改配置文件中的enable.auto.commit=false选项并在消费拉取到的数据之后添加consumer.commitSync();语句即可。由于同步提交offset有失败重试机制,故更加可靠。

			 //消费拉取到的数据
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println(record);
            }
            consumer.commitSync();

同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。在更多的情况下,会选用异步提交offset的方式。

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐