三-下, Kafka API dbc
三, Kafka API3.1 Producer API3.1.1 消息发送流程Kafka 的 Producer 发送消息采用的异步发送的方式.在消息发送的过程中, 涉及到了两个线程(main线程和Sender线程), 以及一个双端队列(线程共享变量--RecordAccumulator),main线程将消息发送给RecordAccumlator, Sender线程不断从RecordAccumul
三, Kafka API
3.1 Producer API
3.1.1 消息发送流程
Kafka 的 Producer 发送消息采用的异步发送的方式
.- 在消息发送的过程中, 涉及到了两个线程(
main线程和Sender线程
), 以及一个双端队列(线程共享变量--RecordAccumulator
), - main线程将消息发送给RecordAccumlator, Sender线程不断从RecordAccumulator 中拉取消息到Kafka Broker;
- main线程-------传送给------->RecordAccumulator<------sender拉取消息------Sender-------------发送给-------> Kafka Broker;ff
- 待补充 >>>>>>>>>>>>>>>>>
[相关的参数和知识点]
batch.size
: 只有数据积累到batch.size 之后, sender才会发送数据后;(最小发送容量)linger.ms
: 如果数据迟迟未达到batch.size, sender等待linger.time之后就会发送数据;(数据的最长停留时间)
tips: 什么是异步通信, 同步通信?
- 同步通信时, 发送者发送消息后, 要把自身的进程阻塞, 只有等到收到接受者的确认之后, 才继续往下执行别的任务;
- 异步通信时, 发送者发送消息后,可以不用等待接受者的确认, 转去执行别的任务.
参考文章:同步方式的消息发送和异步方式的消息发送
3.1.2 异步发送 API
3.1.2.1. 不带回调函数的生产者API示例
-
Step1.
<kbd>
导入kafka客户端相关的依赖:- 新建Maven项目(new Module–> 填好相应的gav)
- 在pom文件中引入kafka客户端相关的依赖, 注意!版本一定要跟kafka集群的保持一致, 否则一定会发生错误
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</groupId>
<!--<version>0.11.0.0</vesion>, 版本太老, 弃用-->
<version>3.0.0</version>
</dependency>
</dependencies>
- Step2.
<kbd>
在使用Kafka客户端操纵生产者生产数据时需要用到的类:
KafkaProducer
: 生产者对象, 操纵send() 向topic发送数据.
ProducerConfig
: 获取所需的一系列配置参数, 其中有两项是必不可少的参数,连接服务器的, bootstrap-server
和指定k-v序列化的类
ProducerRecord
: 封装与topic有关的各项信息, 如下图, 这个类有辣么多中不同参的构造方法.
-
Step3.
<kbd>
具体代码编写- 编写代码操纵生产者向kafka集群的某个topic发送数据:
可以先看下面的简化版代码, 快速上手:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args) {
//创建一个kafka客户端对象, 还需要导入配置, 所以还需要一个配置对象作为参数
//===============================================================
//配置对象
Properties properties = new Properties();
//具体的参数
//最重要的参数: broke-list/bootStrap-server 主机名:端口
properties.put("bootstrap.servers", "bigdata01:9092");
//同样必不可少的参数
//(键和值所使用的的序列化类)
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//其他的常见的一些参数
// 分区leader向producer发送确认的级别, 0, -1/all, 1
properties.put("ack", "all");
//重试次数
properties.put("retries", 1);
//批次大小 batch.size, 16k
properties.put("batch.size", 16384);
//数据发送的最长等待时间
properties.put("linger.ms", 1);
//RecordAccumulator缓冲区的大小
properties.put("buffer.memory", 33554432);
//===============================================================
//生产者对象, 生产者的类名是 KafkaProducer. 注意, 生产者对象实例化时需要配置对象作为参数
Producer<String, String> producer = new KafkaProducer<>(properties);
//================================================================
//producer生产数据
//数据都被封装在了ProducerRecord类中, 所以相应的, 生产者对象发送的是producerRecod 的类,
/// produrcer.send(new ProducerRecord(key: "topic"; value: "值"))
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("sb", "xxxxxxxxxx");
producer.send(producerRecord);
//关闭producer资源
producer.close();
}
}
对上面代码的极致简化:
package producer2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomerProducer {
public static void main(String[] args) {
//2. 创建properties配置对象;
Properties prop = new Properties();
//3. 连接集群
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata01:9092, bigdata02:9092");
//prop.put("bootstrap.servers", "localhost:9092,localhost:9093");
//4. 指定key和value 的序列化类型
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//获取string序列化器类的全类名
//prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 等同于上一条, 推荐上面的写法
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//
//1. kafka对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(prop);
//5. 调用send方法向topic发送数据
for (int i = 0; i < 8; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first", "areusb?"));
}
//6.关闭资源
kafkaProducer.close();
}
}
kafka集群中开启消费者进程, 获取到发送的数据:
kafka-console-consumer.sh --broker-list bigdata01:9092 --topic first
有人可能会遇到的很低级的错误(比如我):
- 要在windows主机上的
hosts文件
中, 配置访问kafak集群主机的主机名和ip地址之间的映射, 否则就会出现Failed to update metadata after 60000 ms.
连接超时的问题:
Missing required configuration "bootstrap.servers" which has no default value.
3.1.2.2 带回调函数的生产者API示例
- 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和Exception,
- 如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。
- 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
public class MyProducerWithCallBack {
public static void main(String[] args) {
带回调函数的消费者api
//2. 新建配置对象, 设置必要的配置参数
Properties props = new Properties();
//必须参数1: kafka服务器的主机名, 端口
props.put("bootstrap.servers", "bigdata01:9092");
//必须参数2: k-v序列化用到的两个类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
///1. 新建一个消费者对象, 把配置对象作为参数传递进去
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//3. 生产
// 生产的消息的(主题, 分区, 值)都被封装到了ProducerRecord中
ProducerRecord<String, String> producerRecord = null;
for (int i = 0; i < 5; i++){
producerRecord = new ProducerRecord<>("sb","record"+i);
System.out.println(i);
//消息发送, 并加上回调函数, 带回消息发送的结果.
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println("success" + "-->offset: " + recordMetadata.offset()); //recordMetadata为元数据对象, 本行就就是获取元数据中的offset
} else {
e.getMessage();
}
}
});
}
// QA;忘记关闭producer 资源的后果!?
producer.close();
}
}
简化版代码:
package producer2;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomerProducerWithCallBack {
public static void main(String[] args) {
//1. 配置对象
Properties prop = new Properties();
//2. 设置参数
//2.1 设置bootStrap
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata01:9092, bigdata02:9092");
//2.2 设置key和value 的序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//3.把配置传入到kafkaproducer对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(prop);
//4.
kafkaProducer.send(new ProducerRecord<String, String>("first", "this is a string"),
new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("OffSet:" + recordMetadata.offset()
+ "Topic: " + recordMetadata.topic()
+ "Partition: " + recordMetadata.partition()
+ "string: " + recordMetadata);
}
});
//5. 千万不要忘记关闭资源!!!
kafkaProducer.close();
}
}
对 RecordMetadata 所含方法的说明:
3.1.3 同步发送 API
一句话, 只需要在send方法后加上一个get(), 异步的方法就能够变为同步发送,
3.1.4 分区代码演示
- 原理
package producer2;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerWithDifferPartitions {
public static void main(String[] args) {
//1. 配置对象
Properties prop = new Properties();
//2. 传入必要的配置
//2.1 服务器配置
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata01:9092, bigdata02:9092");
//2.2 key和value 的序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 这里还可以对四个参数(等待时间, 批次大小, 缓冲区大小, 压缩方法)进行设置, prop.put(xx)
//3.创建kafkaProducer对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(prop);
//4.发送, 注意发送的是一个封装的对象
// 这个对象叫做 ProducerRecord, 这个类中有五个不同
//1. 前三种方法, 指明了partition, 直接存入指定的partition中
kafkaProducer.send(new ProducerRecord<>("first", 2, "xxx", "xxx"), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println(recordMetadata.partition());
}
});
//后两种方法,
//public ProducerRecord(topic, key, value)
//2. 没有指明partition, 但有key, 此时分区 = key的hashcode % topic的partition数量 (hash/分区数)
kafkaProducer.send(new ProducerRecord<String, String>("first", "c", "hash%分区数"), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("分区号为: " + recordMetadata.partition());
}
});
//
// //public ProducerRecord(topic, value)
//3. 既没有partition, 又没有key, 粘性分区器(sticky partition), 随机选择一个分区
//并尽可能的使用该分区, 等待该分区batch已满, 或者已经完成, 再去随机一个分区使用
kafkaProducer.send(new ProducerRecord<String, String>("first", "hash%分区数"), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("分区号为: " + recordMetadata.partition());
}
});
//关闭资源
kafkaProducer.close();
}
}
3.1.5 自定义分区器
写法: 自定义一个分区器类, 继承Partitioner类, 重写它的几个方法即可;
如何引入自定义类? 在分区的main方法中引入相关的自定义分区类即可;
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "自定义分区类的全类名");
// 自定义的分区类的实现
package producer2;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class OEMPartition implements Partitioner {
/**
*org.apache.kafka.clients.producer.Partitioner
* producer2.OEMPartition
*
* @param topic 主题
* @param key 消息的key
* @param keyBytes 消息的key序列化后的字节数组
* @param value 消息的value
* @param valueBytes 消息的value序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return partition 返回的是指定的分区号
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//1. 获取消息
String msg = value.toString();
//2. 判断(过滤)
int partition;
if(msg.contains("\\d+")){
partition = 1;
}else{
partition = 2;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
///
//在main方法中引入这个类
//5. 关联自定义的分区器类
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "producer2.OEMPartition");
3.1.6 提高生产者数据吞吐量的四个参数
3.2 Consumer API
1. 独立消费者案例(订阅主题)
package consumer2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
//单消费者demo
//1. 创建配置对象
Properties prop = new Properties();
//1.1. 设置服务器
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.182.100:9092");
//1.2 设置key和value的反序列化类
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/重要的来了,
//1.3 设置消费者组. 必须
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testG");
//2. 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(prop);
//3. 指定(也就是订阅topic)要消费的目标topics !! 注意是topics, 是一个集合
ArrayList<String> list = new ArrayList<>();
list.add("first");
kafkaConsumer.subscribe(list);
while(true){
//4. 拉取(pull)打印到的数据
//参数是每批次之间的间隔时间
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
//5. 打印读取到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
//6.关闭资源
//kafkaConsumer.close();
}
}
2. 独立消费者案例(订阅分区)
// 消费某个主题的某个分区数据
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0));
kafkaConsumer.assign(topicPartitions);
3. 消费者组案例
1) 复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者。
- 启动代码中的生产者发送消息, 在 IDEA 控制台即可看到两个消费者在消费不同分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码 Thread.sleep(2);。
3)重新发送到一个全新的主题中,由于默认创建的主题分区数为 1,可以看到只能
有一个消费者消费到数据。
3.2.1 自动提交 offset
3.2.2 手动提交 offset
3.2.2 自定义存储 offset
3.2 自定义 Interceptor
3.2.1 拦截器原理
3.2.2 拦截器案例
更多推荐
所有评论(0)