Kafka工具类-ConumerAPI工具类(手动提交offset手动指定offset,限流工具类,多线程并发处理工具类)
1.Kafka客户端操作AdminClientAPI:允许管理和检测Topic,broker以及其他kafka对象ProducerAPI:发布消息到1个或者多个topicConsumerAPI:订阅一个或者多个topic,并处理产生的消息上述三类API为我们生产中主要使用的APIproducer的作用就是创建topic并且向其发送数据,而consumer的作用是拉取topic上面数据,进行相应的业
·
1.Kafka客户端操作
- AdminClientAPI:允许管理和检测Topic,broker以及其他kafka对象
- ProducerAPI:发布消息到1个或者多个topic
- ConsumerAPI:订阅一个或者多个topic,并处理产生的消息
上述三类API为我们生产中主要使用的API
producer的作用就是创建topic并且向其发送数据,而consumer的作用是拉取topic上面数据,进行相应的业务处理.
手动提交,手动指定offset起始位置
在真正生产中我们消费一条数据后,我们是要对其进行业务处理的,当然就会出现业务处理失败的情况,和耗时的情况,基于这样的场景去使用kafka,而kafka的特点是你消费过的是不会再进行消费的,如果是自动提交就会产生一个问题,当我们的数据没有处理完成,而kafka却给我们自动提交了,而又无法重复消费,就尴尬了!!所以我们要进行手动提交
- 为什么会出现,数据消费过后,就不能在次消费了那?
产生这样的原因是kafka的offset机制,这个offset就会记录在哪个topic上的偏移量,就读文件的内容一样的逻辑
- 1.kafka手动提交offset
- 2.kafka手动提交offset并且手动控制partition,实现一个partition对应一个offset的最优效率架构
- 3.手动订阅某个或者某些分区,并提交offset
- 4.手动指定offset起始位置,及手动提交offset
- 5.流量控制,限流
package test;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
/**
* consumerUtil:实现的是:
* 1.kafka手动提交offset
* 2.kafka手动提交offset并且手动控制partition,实现一个partition对应一个offset的最优效率架构
* 3.手动订阅某个或者某些分区,并提交offset
* 4.手动指定offset起始位置,及手动提交offset
* 5.流量控制,限流
*/
public class consumerUtil {
private final static String TOPIC_NAME = "yuge_topic";
public static void main(String[] args) {
//手动提交offset
commitedOffset();
//手动对每个partition进行提交
commitedOffsetWithPartition();
//手动订阅某个或者某些分区,并提交offset
commitedOffsetWithPartition2();
//手动指定offset起始位置,及手动提交offset
controlOffset();
//流量控制,限流
flowControl();
}
//######### consumer手动提交offset #########
public static void commitedOffset(){
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//把数据保存到数据库中
//如果数据落库的操作失败了,我们就进行回滚,不要提交offset
}
//上面完成了数据的业务操作了,我们进行手动提交offset
consumer.commitSync();
}
}
//######### consumer手动提交offset并且手动控制partition #########
public static void commitedOffsetWithPartition(){
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
/**
* 实现的是:一个topic有多个partition,普通的是:
* 一个consumer对应一个topic里面多个partition
* 而我们要实现的是:使一个partition对应一个consumer,
* 以达到一个效率最优的架构逻辑
*/
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);//获取到一个单独的partition里面内容
/**
* 这就变为循环这一个partition里面内容,这就相当于单个partition做单独处理
* 单独处理的目的是: 对每一个partition单独的去提交offset
*/
for (ConsumerRecord<String, String> record : pRecord) {
//对拉取的数据进行业务操作
//.....
}
//上面完成了业务操作了,我们进行手动提交offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();//服务器上面的offset是我们的起点
//单个partition中的offset,并且进行提交
HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1)); //服务器上面的offset是我们的起点,通过+1防止重复消费
//提交offset
consumer.commitSync(offset);
System.out.println("==============partition-"+partition+"-end================");
}
}
}
//######### consumer手动提交offset并且手动控制partition,更高级 #########
public static void commitedOffsetWithPartition2(){
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//yuge_topic -0,1两个partition
TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);
//消费订阅某个topic的某个分区
consumer.assign(Collections.singletonList(p0)); //assign指定订阅
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
/**
* 实现的是:一个topic有多个partition,普通的是:
* 一个consumer对应一个topic里面多个partition
* 而我们要实现的是:使一个partition对应一个consumer,
* 以达到一个效率最优的架构逻辑
*/
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);//获取到一个单独的partition里面内容
/**
* 这就变为循环这一个partition里面内容,这就相当于单个partition做单独处理
* 单独处理的目的是: 对每一个partition单独的去提交offset
*/
for (ConsumerRecord<String, String> record : pRecord) {
//对拉取的数据进行业务操作
//.....
}
//上面完成了业务操作了,我们进行手动提交offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();//服务器上面的offset是我们的起点
//单个partition中的offset,并且进行提交
HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1)); //服务器上面的offset是我们的起点,通过+1防止重复消费
//提交offset
consumer.commitSync(offset);
System.out.println("==============partition-"+partition+"-end================");
}
}
}
//######### 手动指定offset起始位置,及手动提交offset #########
public static void controlOffset(){
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//yuge_topic -0,1两个partition
TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
//消费订阅某个topic的某个分区
consumer.assign(Collections.singletonList(p0)); //assign指定订阅
while (true){
/**
* 1.开始从0消费
* 2.比如一次消费了100条,把offset置为101,并且存入redis/hbase
* 3.每次poll之前,从redis/hbase获取最新的offset位置,使用.seek指定其起始,
* 然后开始消费
*/
//手动指定offset起始位置
consumer.seek(p0,400);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
/**
* 实现的是:一个topic有多个partition,普通的是:
* 一个consumer对应一个topic里面多个partition
* 而我们要实现的是:使一个partition对应一个consumer,
* 以达到一个效率最优的架构逻辑
*/
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);//获取到一个单独的partition里面内容
/**
* 这就变为循环这一个partition里面内容,这就相当于单个partition做单独处理
* 单独处理的目的是: 对每一个partition单独的去提交offset
*/
for (ConsumerRecord<String, String> record : pRecord) {
//对拉取的数据进行业务操作
//.....
}
//上面完成了业务操作了,我们进行手动提交offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();//服务器上面的offset是我们的起点
//单个partition中的offset,并且进行提交
HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1)); //服务器上面的offset是我们的起点,通过+1防止重复消费
//提交offset
consumer.commitSync(offset);
System.out.println("==============partition-"+partition+"-end================");
}
}
}
//######### 流量控制,限流 #########
public static void flowControl(){
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//yuge_topic -0,1两个partition
TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);
//消费订阅某个topic的某个分区
consumer.assign(Collections.singletonList(p0)); //assign指定订阅
long totalNum = 40; //最大流量值
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
/**
* 实现的是:一个topic有多个partition,普通的是:
* 一个consumer对应一个topic里面多个partition
* 而我们要实现的是:使一个partition对应一个consumer,
* 以达到一个效率最优的架构逻辑
*/
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);//获取到一个单独的partition里面内容
/**
* 这就变为循环这一个partition里面内容,这就相当于单个partition做单独处理
* 单独处理的目的是: 对每一个partition单独的去提交offset
*/
long num = 0;
for (ConsumerRecord<String, String> record : pRecord) {
//对拉取的数据进行业务操作
//.....
//限流
num++;
if (record.partition()==0){
if (num>=totalNum){
consumer.pause(Collections.singletonList(p0));
}
}
if (record.partition()==1){
if (num == 40){
consumer.resume(Collections.singletonList(p0));
}
}
}
//上面完成了业务操作了,我们进行手动提交offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();//服务器上面的offset是我们的起点
//单个partition中的offset,并且进行提交
HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1)); //服务器上面的offset是我们的起点,通过+1防止重复消费
//提交offset
consumer.commitSync(offset);
System.out.println("==============partition-"+partition+"-end================");
}
}
}
}
Consumer多线程并发处理工具类
1.第一种多线程处理架构(生产使用)
传统经典式,消费成功提交offset,消费失败回滚,能保证数据一致性,处理成功就是成功
package test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class ConsumerThreadUtil {
private final static String TOPIC_NAME="jiangzh-topic";
/**
*这种类型是经典模式,每一线程单独创建一个KafkaComsumer,用于保证线程安全
*/
public static void main(String[] args) throws InterruptedException {
KafkaConsumerRunner r1 = new KafkaConsumerRunner();
Thread t1 = new Thread(r1);
t1.start();
Thread.sleep(15000);
r1.shutdown();
}
public static class KafkaConsumerRunner implements Runnable{
private final AtomicBoolean closed = new AtomicBoolean(false); //状态开关
private final KafkaConsumer consumer;//每一个线程创建一个consumer,保证线程安全
public KafkaConsumerRunner() { //kafka的初始化->生成一个kafka的客户端
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.220.128:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
consumer.assign(Arrays.asList(p0,p1));//指定消费的partition
}
public void run() {
try {
while(!closed.get()) { //控制器
//处理消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
// 处理每个分区的消息
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(),record.offset(), record.key(), record.value());
}
// 返回去告诉kafka新的offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
// 注意加1
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}catch(WakeupException e) {
if(!closed.get()) {
throw e;
}
}finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}
2.第二种多线程架构模式
用于快速处理数据,适合流式数据,对准确性,是无法满足的,因为consumer是无法提交offset的
EventHander就是一个单独的线程
他的考量是consumer如果创建过多,会导致线程过重
package test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.*;
public class ConsumerRecordThreadSample {
private final static String TOPIC_NAME = "jiangzh-topic";
public static void main(String[] args) throws InterruptedException {
String brokerList = "192.168.220.128:9092";
String groupId = "test";
int workerNum = 5;
CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
consumers.execute(workerNum);
Thread.sleep(1000000);
consumers.shutdown();
}
// Consumer处理
public static class CunsumerExecutor{//线程池
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
public CunsumerExecutor(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum) {//workerNum核心线程数
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
//executors就是我们的线程池
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (final ConsumerRecord record : records) {
executors.submit(new ConsumerRecordWorker(record));
}
}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
// 记录处理
public static class ConsumerRecordWorker implements Runnable {//运行这个类就会执行run中的逻辑
private ConsumerRecord<String, String> record;
public ConsumerRecordWorker(ConsumerRecord record) {
this.record = record;
}
@Override
public void run() {
//业务逻辑代码
System.out.println("Thread - "+ Thread.currentThread().getName());
System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
更多推荐
已为社区贡献4条内容
所有评论(0)