三, Kafka API

3.1 Producer API

3.1.1 消息发送流程

  • Kafka 的 Producer 发送消息采用的异步发送的方式.
  • 在消息发送的过程中, 涉及到了两个线程(main线程和Sender线程), 以及一个双端队列(线程共享变量--RecordAccumulator),
  • main线程将消息发送给RecordAccumlator, Sender线程不断从RecordAccumulator 中拉取消息到Kafka Broker;
  • main线程-------传送给------->RecordAccumulator<------sender拉取消息------Sender-------------发送给-------> Kafka Broker;ff

在这里插入图片描述

  • 待补充 >>>>>>>>>>>>>>>>>

[相关的参数和知识点]

  1. batch.size: 只有数据积累到batch.size 之后, sender才会发送数据后;(最小发送容量)
  2. linger.ms: 如果数据迟迟未达到batch.size, sender等待linger.time之后就会发送数据;(数据的最长停留时间)

tips: 什么是异步通信, 同步通信?

  1. 同步通信时, 发送者发送消息后, 要把自身的进程阻塞, 只有等到收到接受者的确认之后, 才继续往下执行别的任务;
  2. 异步通信时, 发送者发送消息后,可以不用等待接受者的确认, 转去执行别的任务.

参考文章:同步方式的消息发送和异步方式的消息发送

3.1.2 异步发送 API

3.1.2.1. 不带回调函数的生产者API示例
  • Step1. <kbd> 导入kafka客户端相关的依赖:

    • 新建Maven项目(new Module–> 填好相应的gav)
    • 在pom文件中引入kafka客户端相关的依赖, 注意!版本一定要跟kafka集群的保持一致, 否则一定会发生错误
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</groupId>
        <!--<version>0.11.0.0</vesion>, 版本太老, 弃用-->
        <version>3.0.0</version>
    </dependency>
</dependencies>
  • Step2.<kbd>在使用Kafka客户端操纵生产者生产数据时需要用到的类:
  1. KafkaProducer: 生产者对象, 操纵send() 向topic发送数据.
  1. ProducerConfig: 获取所需的一系列配置参数, 其中有两项是必不可少的参数,连接服务器的, bootstrap-server和指定k-v序列化的类

在这里插入图片描述

  1. ProducerRecord: 封装与topic有关的各项信息, 如下图, 这个类有辣么多中不同参的构造方法.

在这里插入图片描述

  • Step3. <kbd>具体代码编写

    • 编写代码操纵生产者向kafka集群的某个topic发送数据:

可以先看下面的简化版代码, 快速上手:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {


        //创建一个kafka客户端对象, 还需要导入配置, 所以还需要一个配置对象作为参数
        //===============================================================
        //配置对象
        Properties properties = new Properties();
        //具体的参数
            //最重要的参数: broke-list/bootStrap-server 主机名:端口
            properties.put("bootstrap.servers", "bigdata01:9092");

            //同样必不可少的参数
            //(键和值所使用的的序列化类)
            properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

            //其他的常见的一些参数
            // 分区leader向producer发送确认的级别, 0, -1/all, 1
            properties.put("ack", "all");
            //重试次数
            properties.put("retries", 1);
            //批次大小 batch.size, 16k
            properties.put("batch.size", 16384);
            //数据发送的最长等待时间
            properties.put("linger.ms", 1);
            //RecordAccumulator缓冲区的大小
            properties.put("buffer.memory", 33554432);

        //===============================================================
        //生产者对象, 生产者的类名是 KafkaProducer. 注意, 生产者对象实例化时需要配置对象作为参数
        Producer<String, String> producer = new KafkaProducer<>(properties);

        //================================================================
        //producer生产数据
        //数据都被封装在了ProducerRecord类中, 所以相应的, 生产者对象发送的是producerRecod 的类,
        ///   produrcer.send(new ProducerRecord(key: "topic";  value: "值"))
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("sb", "xxxxxxxxxx");

        producer.send(producerRecord);
        //关闭producer资源
        producer.close();
    }
}

对上面代码的极致简化:

package producer2;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomerProducer {
    public static void main(String[] args) {

        //2. 创建properties配置对象;
        Properties prop = new Properties();
  
        //3. 连接集群
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata01:9092, bigdata02:9092");
        //prop.put("bootstrap.servers", "localhost:9092,localhost:9093");
   
        //4. 指定key和value 的序列化类型
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//获取string序列化器类的全类名
        //prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 等同于上一条, 推荐上面的写法
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//

        //1. kafka对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(prop);


        //5. 调用send方法向topic发送数据
        for (int i = 0; i < 8; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("first", "areusb?"));
        }

        //6.关闭资源
        kafkaProducer.close();
    }

}

kafka集群中开启消费者进程, 获取到发送的数据:

kafka-console-consumer.sh --broker-list bigdata01:9092 --topic first
在这里插入图片描述

有人可能会遇到的很低级的错误(比如我):

  1. 要在windows主机上的hosts文件中, 配置访问kafak集群主机的主机名和ip地址之间的映射, 否则就会出现Failed to update metadata after 60000 ms.连接超时的问题:

在这里插入图片描述

  1. Missing required configuration "bootstrap.servers" which has no default value.

在这里插入图片描述

3.1.2.2 带回调函数的生产者API示例

在这里插入图片描述

  • 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadataException
  • 如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。
  • 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
public class MyProducerWithCallBack {
    public static void main(String[] args) {
        带回调函数的消费者api

        //2. 新建配置对象, 设置必要的配置参数
        Properties props = new Properties();

        //必须参数1: kafka服务器的主机名, 端口
        props.put("bootstrap.servers", "bigdata01:9092");
        //必须参数2:  k-v序列化用到的两个类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        ///1. 新建一个消费者对象, 把配置对象作为参数传递进去
        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        //3. 生产
        // 生产的消息的(主题, 分区, 值)都被封装到了ProducerRecord中

        ProducerRecord<String, String> producerRecord = null;

        for (int i = 0; i < 5; i++){
            producerRecord = new ProducerRecord<>("sb","record"+i);
            System.out.println(i);


        //消息发送, 并加上回调函数, 带回消息发送的结果.
        producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e == null) {
                    System.out.println("success" + "-->offset: " + recordMetadata.offset()); //recordMetadata为元数据对象, 本行就就是获取元数据中的offset
                } else {
                    e.getMessage();
                }
            }
        });
        }

        // QA;忘记关闭producer 资源的后果!?
        producer.close();
    }
}

简化版代码:

package producer2;


import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomerProducerWithCallBack {
    public static void main(String[] args) {
        //1. 配置对象
        Properties prop = new Properties();

        //2. 设置参数
        //2.1 设置bootStrap
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata01:9092, bigdata02:9092");
        //2.2 设置key和value 的序列化
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //3.把配置传入到kafkaproducer对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(prop);

        //4.
        kafkaProducer.send(new ProducerRecord<String, String>("first", "this is a string"),
                new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        System.out.println("OffSet:" + recordMetadata.offset()
                                + "Topic: " + recordMetadata.topic()
                                + "Partition: " + recordMetadata.partition()
                                + "string: " + recordMetadata);
                    }
                });

        //5. 千万不要忘记关闭资源!!!

        kafkaProducer.close();
    }
}

对 RecordMetadata 所含方法的说明:

在这里插入图片描述

3.1.3 同步发送 API

一句话, 只需要在send方法后加上一个get(), 异步的方法就能够变为同步发送,

3.1.4 分区代码演示

  • 原理
package producer2;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerWithDifferPartitions {
    public static void main(String[] args) {
        //1. 配置对象
        Properties prop = new Properties();

        //2. 传入必要的配置
        //2.1 服务器配置
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata01:9092, bigdata02:9092");

        //2.2 key和value 的序列化
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 这里还可以对四个参数(等待时间, 批次大小, 缓冲区大小, 压缩方法)进行设置, prop.put(xx)

        //3.创建kafkaProducer对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(prop);

        //4.发送, 注意发送的是一个封装的对象
        // 这个对象叫做 ProducerRecord, 这个类中有五个不同
        //1. 前三种方法, 指明了partition, 直接存入指定的partition中
        kafkaProducer.send(new ProducerRecord<>("first", 2, "xxx", "xxx"), new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.out.println(recordMetadata.partition());
            }
        });

        //后两种方法,
        //public ProducerRecord(topic, key, value)
        //2. 没有指明partition, 但有key, 此时分区 = key的hashcode % topic的partition数量 (hash/分区数)
        kafkaProducer.send(new ProducerRecord<String, String>("first", "c", "hash%分区数"), new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.out.println("分区号为: " + recordMetadata.partition());
            }
        });
//
//        //public ProducerRecord(topic, value)
        //3. 既没有partition, 又没有key, 粘性分区器(sticky partition), 随机选择一个分区
        //并尽可能的使用该分区, 等待该分区batch已满, 或者已经完成, 再去随机一个分区使用
        kafkaProducer.send(new ProducerRecord<String, String>("first", "hash%分区数"), new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.out.println("分区号为: " + recordMetadata.partition());
            }
        });


        //关闭资源
        kafkaProducer.close();
    }
}

3.1.5 自定义分区器

写法: 自定义一个分区器类, 继承Partitioner类, 重写它的几个方法即可;
如何引入自定义类? 在分区的main方法中引入相关的自定义分区类即可;
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "自定义分区类的全类名");


// 自定义的分区类的实现
package producer2;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class OEMPartition implements Partitioner {
    /**
     *org.apache.kafka.clients.producer.Partitioner
     * producer2.OEMPartition
     *
     * @param topic 主题
     * @param key 消息的key
     * @param keyBytes  消息的key序列化后的字节数组
     * @param value 消息的value
     * @param valueBytes    消息的value序列化后的字节数组
     * @param cluster 集群元数据可以查看分区信息
     * @return partition 返回的是指定的分区号
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //1. 获取消息
        String msg = value.toString();

        //2. 判断(过滤)
        int partition;
        if(msg.contains("\\d+")){
            partition = 1;
        }else{
            partition = 2;
        }

        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
///
//在main方法中引入这个类
 //5. 关联自定义的分区器类
        prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "producer2.OEMPartition");

3.1.6 提高生产者数据吞吐量的四个参数

3.2 Consumer API

1. 独立消费者案例(订阅主题)

在这里插入图片描述

package consumer2;



import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) {
        //单消费者demo

        //1. 创建配置对象
        Properties prop = new Properties();

        //1.1. 设置服务器
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.182.100:9092");
        //1.2 设置key和value的反序列化类
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        /重要的来了,
        //1.3 设置消费者组. 必须
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testG");

        //2. 创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(prop);

        //3. 指定(也就是订阅topic)要消费的目标topics !! 注意是topics, 是一个集合
        ArrayList<String> list = new ArrayList<>();
        list.add("first");
        kafkaConsumer.subscribe(list);


        while(true){
            //4. 拉取(pull)打印到的数据
            //参数是每批次之间的间隔时间
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //5. 打印读取到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }

        //6.关闭资源
        //kafkaConsumer.close();
    }
}

在这里插入图片描述

2. 独立消费者案例(订阅分区)

在这里插入图片描述

// 消费某个主题的某个分区数据
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("first", 0));
        kafkaConsumer.assign(topicPartitions);

在这里插入图片描述

3. 消费者组案例

在这里插入图片描述

1) 复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者。

  1. 启动代码中的生产者发送消息, 在 IDEA 控制台即可看到两个消费者在消费不同分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码 Thread.sleep(2);。

在这里插入图片描述

3)重新发送到一个全新的主题中,由于默认创建的主题分区数为 1,可以看到只能
有一个消费者消费到数据。

在这里插入图片描述

3.2.1 自动提交 offset

3.2.2 手动提交 offset

3.2.2 自定义存储 offset

3.2 自定义 Interceptor

3.2.1 拦截器原理

3.2.2 拦截器案例

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐