Kafka API

Producer API

消息发送流程

Kafka 的 Producer 发送消息采用的是异步发送的方式(拉取到了数据就发送,不会等待上一次发送之后的ack的结果,ack只是保证数据丢不丢,不是保证数据是否按照正确的顺序发送和接收)。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator

main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。

KafkaProducer 发送消息流程

在这里插入图片描述

相关参数:

batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。

linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。

发送前会先经过拦截器、序列化器,最后走分区器进行分区发送。

异步发送 API

导入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

编写代码 需要用到的类:

KafkaProducer:需要创建一个生产者对象,用来发送数据

ProducerConfig:获取所需的一系列配置参数

ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象

1.不带回调函数的 API

public class MyProducer {

    public static void main(String[] args) {

        // 1、创建Kafka生产者的配置信息
        Properties properties = new Properties();

        // 指定连接的kafka集群,生产环境中需要配置多个,并且用英文逗号隔开且不能有空格
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
        // ACK重试的次数
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        // 重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 批次大小,到了16k发送
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 等待时间,到了1ms发送
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // RecordAccumulator 缓冲区大小,32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

        // key序列化类,如果是Long或者Integer需要替换成对应的序列化类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化类
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        // 2、创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 3、发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>(
                    "first",
                    "hello-" + i));
        }

        // 4、关闭资源
        // 会进行资源的回收,并且将未发送完的数据发送完成
        producer.close();
    }
}

启动消费者

bin/kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first

启动程序,结果如下:

hello-0
hello-2
hello-4
hello-6
hello-8
hello-1
hello-3
hello-5
hello-7
hello-9

2.带回调函数的 API

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

public class CallBackProducer {

    public static void main(String[] args) {

        // 1、创建Kafka生产者的配置信息
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        // 2、创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 3、发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>(
                    "first",
                    "hello-" + i),
                    new Callback() {
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e == null) {
                                // offset从零开始
                                System.out.println(recordMetadata.partition() + "--" + recordMetadata.offset());
                            } else {
                                e.printStackTrace();
                            }
                        }
                    });
        }

        // 4、关闭资源
        // 会进行资源的回收,并且将未发送完的数据发送完成
        producer.close();
    }
}

运行程序,代码结果如下:

1--20
1--21
1--22
1--23
1--24
0--20
0--21
0--22
0--23
0--24

测试分区

// 直接指定分区为0号分区
producer.send(new ProducerRecord<String, String>(
    "first",
    0,
    "hello",
    "hello-" + i),
    new Callback());

代码结果如下:
0--25
0--26
0--27
0--28
0--29
0--30
0--31
0--32
0--33
0--34

消费者打印如下:
hello-0
hello-1
hello-2
hello-3
hello-4
hello-5
hello-6
hello-7
hello-8
hello-9
    
    
// 不指定分区为0号分区,固定按照hello的hash进行分区
producer.send(new ProducerRecord<String, String>(
    "first",
    0,
    "hello",
    "hello-" + i),
    new Callback());

程序结果如下:
1--25
1--26
1--27
1--28
1--29
1--30
1--31
1--32
1--33
1--34

。。。

自定义分区器

public class MyPartitions implements Partitioner {

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 具体的业务逻辑,可以参考它默认怎么实现的
        return 1;
    }

    public void close() {

        // 关闭的逻辑
    }

    public void configure(Map<String, ?> map) {

    }
}

public class MyPartitionProducer {

    public static void main(String[] args) {

        // 1、创建Kafka生产者的配置信息
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        // 设置分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                "com.starnet.kafka.partitions.MyPartitions");

        // 2、创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 3、发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first","hello-" + i), new CallBack());
        }

        // 4、关闭资源
        // 会进行资源的回收,并且将未发送完的数据发送完成
        producer.close();
    }
}

同步发送 API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。

由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方发即可。

生产环境中同步发送使用的比较少。

Future<RecordMetadata> first = producer.send(new ProducerRecord<String, String>("first", "hello-" + i));
            
first.get();

Consumer API

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。

需要用到的类:

KafkaConsumer:需要创建一个消费者对象,用来消费数据

ConsumerConfig:获取所需的一系列配置参数

ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象

为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。自动提交 offset 的相关参数:

enable.auto.commit:是否开启自动提交 offset 功能

auto.commit.interval.ms:自动提交 offset 的时间间隔

public class MyConsumer {

    public static void main(String[] args) {

        // 创建消费者配置信息
        Properties properties = new Properties();
        // 连接的集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
        // 开启自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动提交的延迟
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // Key, Value的反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        // 消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "wei");

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题们可以订阅多个主题,second不存在也不会报错,会有警告
        consumer.subscribe(Arrays.asList("first", "second"));

        // 获取数据,一次拉去多个
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);

            // 解析并打印consumerRecords
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
            }

        }
    }
}


// 使用上面写的生产者的代码作为生产者:
// 其中topic为first,key为caocao,value为hello-i
producer.send(new ProducerRecord<String, String>("first", "caocao","hello-" + i));

结果如下:
caocao--hello-0
caocao--hello-1
caocao--hello-2
caocao--hello-3
caocao--hello-4
caocao--hello-5
caocao--hello-6
caocao--hello-7
caocao--hello-8
caocao--hello-9

重置offset

即获取该topic从0开始的数据。

在以上代码的基础上,增加和修改如下代码即可获取topic从0开始的数据。

// 消费者组没有消费过(换一个组),消费者组消费的对应的offset被删掉了(7天后被删除了)
// 重置offset用earliest,默认是lastest,从最后位置开始
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 消费者组名字改掉
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "wei1");

手动提交 offset

虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。

手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。

如果消费了数据不提交的话,数据会被消费,但是offset始终不会自动改变,导致消费者下次启动时依旧会从保存的offset开始消费;并且消费者只有在启动时才会读取磁盘中保存的offset,然后自己在内存中维护自己最新的offset,不提交的话永远不会更新到磁盘当中。

public class MyConsumer {

    public static void main(String[] args) {

        // 创建消费者配置信息
        Properties properties = new Properties();
        // 连接的集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
        // 关闭自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // Key, Value的反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        // 消费者组没有消费过(换一个组),消费者组消费的对应的offset被删掉了(7天后被删除了)
        // 重置offset用earliest,默认是lastest,从最后位置开始
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "wei");

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题们可以订阅多个主题,second不存在也不会报错,会有警告
        consumer.subscribe(Arrays.asList("first", "second"));

        // 获取数据,一次拉去多个
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);

            // 解析并打印consumerRecords
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
            }

            // 同步提交,当前线程会阻塞直到 offset 提交成功
            // consumer.commitSync();

            // 异步提交
            consumer.commitAsync(new OffsetCommitCallback() {
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception != null) {
                        System.err.println("Commit failed for" + offsets);
                    }
                }
            });

        }
    }
}

由于同步提交 offset 有失败重试机制,故更加可靠。

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式

offset将会提交到kk本地。

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。

先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。

自定义存储 offset

Kafka 0.9 版本之前, offset 存储在 zookeeper, 0.9 版本及之后,默认将 offset 存储在 Kafka的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。

offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。

当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。

消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。

要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。

自定义存储可以考虑与mysql一起做事务操作。

public class MyOffset {

    private static Map<TopicPartition, Long> currentOffset = new HashMap();

    public static void main(String[] args) {

        // 创建消费者配置信息
        Properties properties = new Properties();
        // 连接的集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
        // 关闭自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // Key, Value的反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        // 消费者组没有消费过(换一个组),消费者组消费的对应的offset被删掉了(7天后被删除了)
        // 重置offset用earliest,默认是lastest,从最后位置开始
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "wei");

        // 创建消费者
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题们可以订阅多个主题,second不存在也不会报错,会有警告
        consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {

            //该方法会在 Rebalance 之前调用
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                commitOffset(currentOffset);
            }

            //该方法会在 Rebalance 之后调用
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                currentOffset.clear();
                for (TopicPartition partition : partitions) {
                    consumer.seek(partition, getOffset(partition)); // 定位到最近提交的 offset 位置继续消费

                }
            }
        });

        // 获取数据,一次拉去多个
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);

            // 解析并打印consumerRecords
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
            }

            // 异步提交
            commitOffset(currentOffset);

        }
    }

    //获取某分区的最新 offset
    private static long getOffset (TopicPartition partition) {
        return 0;
    }

    //提交该消费者所有分区的 offset
    private static void commitOffset(Map<TopicPartition, Long> offset) {

    }
}

自定义 Interceptor

拦截器原理

Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。

对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

(1)configure(configs)

获取配置信息和初始化数据时调用。

(2)onSend(ProducerRecord):

该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。

(3)onAcknowledgement(RecordMetadata, Exception):

该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。

(4)close:

关闭 interceptor,主要用于执行一些资源清理工作

如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

拦截器案例

需求: 实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。

在这里插入图片描述

public class TimeInterceptor implements ProducerInterceptor<String, String> {

    public void configure(Map<String, ?> map) {

    }

    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

        // 取出数据
        String value = record.value();

        // 创对象并返回
        return new ProducerRecord<String, String>(
                record.topic(),
                record.partition(),
                record.key(),
                System.currentTimeMillis() + "," + record.value());
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    public void close() {

    }
}

public class CounterInterceptor implements ProducerInterceptor<String, String> {

    private int success = 0;
    private int error = 0;

    public void configure(Map<String, ?> map) {

    }

    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

        return record;
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

        if (e == null) {
            success++;
        } else {
            error++;
        }
    }

    public void close() {
        System.out.println("success: " + success);
        System.out.println("error: " + error);
    }
}

public class InterceptorProducer {

    public static void main(String[] args) {

        // 1、创建Kafka生产者的配置信息
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        // 添加拦截器
        List<String> interceptors = new ArrayList<String>();
        interceptors.add("com.starnet.kafka.interceptor.TimeInterceptor");
        interceptors.add("com.starnet.kafka.interceptor.CounterInterceptor");

        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        // 2、创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 3、发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", "hello-" + i));
        }

        // 4、关闭资源
        // 会进行资源的回收,并且将未发送完的数据发送完成
        producer.close();
    }
}

程序结果如下:
success: 10
error: 0

消费者消费如下:
1630057877696,hello-1
1630057877697,hello-3
1630057877697,hello-5
1630057877697,hello-7
1630057877697,hello-9
1630057877580,hello-0
1630057877697,hello-2
1630057877697,hello-4
1630057877697,hello-6
1630057877697,hello-8

如果producer.close()没有被调用的话,拦截器是不会自己触发close方法的。

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐