Kafka-API使用以及案例实操
Kafka APIProducer API消息发送流程Kafka 的 Producer 发送消息采用的是异步发送的方式(拉取到了数据就发送,不会等待上一次发送之后的ack的结果,ack只是保证数据丢不丢,不是保证数据是否按照正确的顺序发送和接收)。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。main 线程
Kafka API
Producer API
消息发送流程
Kafka 的 Producer 发送消息采用的是异步发送的方式(拉取到了数据就发送,不会等待上一次发送之后的ack的结果,ack只是保证数据丢不丢,不是保证数据是否按照正确的顺序发送和接收)。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。
main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。
KafkaProducer 发送消息流程
相关参数:
batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。
发送前会先经过拦截器、序列化器,最后走分区器进行分区发送。
异步发送 API
导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
编写代码 需要用到的类:
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象
1.不带回调函数的 API
public class MyProducer {
public static void main(String[] args) {
// 1、创建Kafka生产者的配置信息
Properties properties = new Properties();
// 指定连接的kafka集群,生产环境中需要配置多个,并且用英文逗号隔开且不能有空格
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
// ACK重试的次数
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
// 批次大小,到了16k发送
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 等待时间,到了1ms发送
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator 缓冲区大小,32M
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// key序列化类,如果是Long或者Integer需要替换成对应的序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// value序列化类
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 2、创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// 3、发送数据
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>(
"first",
"hello-" + i));
}
// 4、关闭资源
// 会进行资源的回收,并且将未发送完的数据发送完成
producer.close();
}
}
启动消费者
bin/kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
启动程序,结果如下:
hello-0
hello-2
hello-4
hello-6
hello-8
hello-1
hello-3
hello-5
hello-7
hello-9
2.带回调函数的 API
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
public class CallBackProducer {
public static void main(String[] args) {
// 1、创建Kafka生产者的配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 2、创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// 3、发送数据
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>(
"first",
"hello-" + i),
new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// offset从零开始
System.out.println(recordMetadata.partition() + "--" + recordMetadata.offset());
} else {
e.printStackTrace();
}
}
});
}
// 4、关闭资源
// 会进行资源的回收,并且将未发送完的数据发送完成
producer.close();
}
}
运行程序,代码结果如下:
1--20
1--21
1--22
1--23
1--24
0--20
0--21
0--22
0--23
0--24
测试分区
// 直接指定分区为0号分区
producer.send(new ProducerRecord<String, String>(
"first",
0,
"hello",
"hello-" + i),
new Callback());
代码结果如下:
0--25
0--26
0--27
0--28
0--29
0--30
0--31
0--32
0--33
0--34
消费者打印如下:
hello-0
hello-1
hello-2
hello-3
hello-4
hello-5
hello-6
hello-7
hello-8
hello-9
// 不指定分区为0号分区,固定按照hello的hash进行分区
producer.send(new ProducerRecord<String, String>(
"first",
0,
"hello",
"hello-" + i),
new Callback());
程序结果如下:
1--25
1--26
1--27
1--28
1--29
1--30
1--31
1--32
1--33
1--34
。。。
自定义分区器
public class MyPartitions implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 具体的业务逻辑,可以参考它默认怎么实现的
return 1;
}
public void close() {
// 关闭的逻辑
}
public void configure(Map<String, ?> map) {
}
}
public class MyPartitionProducer {
public static void main(String[] args) {
// 1、创建Kafka生产者的配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 设置分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
"com.starnet.kafka.partitions.MyPartitions");
// 2、创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// 3、发送数据
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("first","hello-" + i), new CallBack());
}
// 4、关闭资源
// 会进行资源的回收,并且将未发送完的数据发送完成
producer.close();
}
}
同步发送 API
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。
由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方发即可。
生产环境中同步发送使用的比较少。
Future<RecordMetadata> first = producer.send(new ProducerRecord<String, String>("first", "hello-" + i));
first.get();
Consumer API
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。
需要用到的类:
KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象
为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。自动提交 offset 的相关参数:
enable.auto.commit:是否开启自动提交 offset 功能
auto.commit.interval.ms:自动提交 offset 的时间间隔
public class MyConsumer {
public static void main(String[] args) {
// 创建消费者配置信息
Properties properties = new Properties();
// 连接的集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
// 开启自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交的延迟
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// Key, Value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "wei");
// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题们可以订阅多个主题,second不存在也不会报错,会有警告
consumer.subscribe(Arrays.asList("first", "second"));
// 获取数据,一次拉去多个
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
// 解析并打印consumerRecords
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
}
}
}
}
// 使用上面写的生产者的代码作为生产者:
// 其中topic为first,key为caocao,value为hello-i
producer.send(new ProducerRecord<String, String>("first", "caocao","hello-" + i));
结果如下:
caocao--hello-0
caocao--hello-1
caocao--hello-2
caocao--hello-3
caocao--hello-4
caocao--hello-5
caocao--hello-6
caocao--hello-7
caocao--hello-8
caocao--hello-9
重置offset
即获取该topic从0开始的数据。
在以上代码的基础上,增加和修改如下代码即可获取topic从0开始的数据。
// 消费者组没有消费过(换一个组),消费者组消费的对应的offset被删掉了(7天后被删除了)
// 重置offset用earliest,默认是lastest,从最后位置开始
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 消费者组名字改掉
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "wei1");
手动提交 offset
虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
如果消费了数据不提交的话,数据会被消费,但是offset始终不会自动改变,导致消费者下次启动时依旧会从保存的offset开始消费;并且消费者只有在启动时才会读取磁盘中保存的offset,然后自己在内存中维护自己最新的offset,不提交的话永远不会更新到磁盘当中。
public class MyConsumer {
public static void main(String[] args) {
// 创建消费者配置信息
Properties properties = new Properties();
// 连接的集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
// 关闭自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Key, Value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 消费者组没有消费过(换一个组),消费者组消费的对应的offset被删掉了(7天后被删除了)
// 重置offset用earliest,默认是lastest,从最后位置开始
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "wei");
// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题们可以订阅多个主题,second不存在也不会报错,会有警告
consumer.subscribe(Arrays.asList("first", "second"));
// 获取数据,一次拉去多个
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
// 解析并打印consumerRecords
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
}
// 同步提交,当前线程会阻塞直到 offset 提交成功
// consumer.commitSync();
// 异步提交
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for" + offsets);
}
}
});
}
}
}
由于同步提交 offset 有失败重试机制,故更加可靠。
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
offset将会提交到kk本地。
无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。
先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。
自定义存储 offset
Kafka 0.9 版本之前, offset 存储在 zookeeper, 0.9 版本及之后,默认将 offset 存储在 Kafka的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。
offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。
当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。
自定义存储可以考虑与mysql一起做事务操作。
public class MyOffset {
private static Map<TopicPartition, Long> currentOffset = new HashMap();
public static void main(String[] args) {
// 创建消费者配置信息
Properties properties = new Properties();
// 连接的集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
// 关闭自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Key, Value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 消费者组没有消费过(换一个组),消费者组消费的对应的offset被删掉了(7天后被删除了)
// 重置offset用earliest,默认是lastest,从最后位置开始
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "wei");
// 创建消费者
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题们可以订阅多个主题,second不存在也不会报错,会有警告
consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {
//该方法会在 Rebalance 之前调用
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
//该方法会在 Rebalance 之后调用
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffset(partition)); // 定位到最近提交的 offset 位置继续消费
}
}
});
// 获取数据,一次拉去多个
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
// 解析并打印consumerRecords
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
}
// 异步提交
commitOffset(currentOffset);
}
}
//获取某分区的最新 offset
private static long getOffset (TopicPartition partition) {
return 0;
}
//提交该消费者所有分区的 offset
private static void commitOffset(Map<TopicPartition, Long> offset) {
}
}
自定义 Interceptor
拦截器原理
Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
(4)close:
关闭 interceptor,主要用于执行一些资源清理工作
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
拦截器案例
需求: 实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
public class TimeInterceptor implements ProducerInterceptor<String, String> {
public void configure(Map<String, ?> map) {
}
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 取出数据
String value = record.value();
// 创对象并返回
return new ProducerRecord<String, String>(
record.topic(),
record.partition(),
record.key(),
System.currentTimeMillis() + "," + record.value());
}
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
public void close() {
}
}
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int success = 0;
private int error = 0;
public void configure(Map<String, ?> map) {
}
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
success++;
} else {
error++;
}
}
public void close() {
System.out.println("success: " + success);
System.out.println("error: " + error);
}
}
public class InterceptorProducer {
public static void main(String[] args) {
// 1、创建Kafka生产者的配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 添加拦截器
List<String> interceptors = new ArrayList<String>();
interceptors.add("com.starnet.kafka.interceptor.TimeInterceptor");
interceptors.add("com.starnet.kafka.interceptor.CounterInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
// 2、创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// 3、发送数据
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("first", "hello-" + i));
}
// 4、关闭资源
// 会进行资源的回收,并且将未发送完的数据发送完成
producer.close();
}
}
程序结果如下:
success: 10
error: 0
消费者消费如下:
1630057877696,hello-1
1630057877697,hello-3
1630057877697,hello-5
1630057877697,hello-7
1630057877697,hello-9
1630057877580,hello-0
1630057877697,hello-2
1630057877697,hello-4
1630057877697,hello-6
1630057877697,hello-8
如果producer.close()没有被调用的话,拦截器是不会自己触发close方法的。
更多推荐
所有评论(0)