1.Kafka客户端操作

  • AdminClientAPI:允许管理和检测Topic,broker以及其他kafka对象
  • ProducerAPI:发布消息到1个或者多个topic
  • ConsumerAPI:订阅一个或者多个topic,并处理产生的消息
    上述三类API为我们生产中主要使用的API

在这里插入图片描述在这里插入图片描述
producer的作用就是创建topic并且向其发送数据,而consumer的作用是拉取topic上面数据,进行相应的业务处理.
在这里插入图片描述
单个分区的消息只能有ConsumerGroup中某个Consumer消费

手动提交,手动指定offset起始位置

在真正生产中我们消费一条数据后,我们是要对其进行业务处理的,当然就会出现业务处理失败的情况,和耗时的情况,基于这样的场景去使用kafka,而kafka的特点是你消费过的是不会再进行消费的,如果是自动提交就会产生一个问题,当我们的数据没有处理完成,而kafka却给我们自动提交了,而又无法重复消费,就尴尬了!!所以我们要进行手动提交

  • 为什么会出现,数据消费过后,就不能在次消费了那?
    产生这样的原因是kafka的offset机制,这个offset就会记录在哪个topic上的偏移量,就读文件的内容一样的逻辑
  • 1.kafka手动提交offset
  • 2.kafka手动提交offset并且手动控制partition,实现一个partition对应一个offset的最优效率架构
  • 3.手动订阅某个或者某些分区,并提交offset
  • 4.手动指定offset起始位置,及手动提交offset
  • 5.流量控制,限流
package test;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;

/**
 * consumerUtil:实现的是:
 * 1.kafka手动提交offset
 * 2.kafka手动提交offset并且手动控制partition,实现一个partition对应一个offset的最优效率架构
 * 3.手动订阅某个或者某些分区,并提交offset
 * 4.手动指定offset起始位置,及手动提交offset
 * 5.流量控制,限流
 */
public class consumerUtil {
    private final static String TOPIC_NAME = "yuge_topic";


    public static void main(String[] args) {
        //手动提交offset
        commitedOffset();

        //手动对每个partition进行提交
        commitedOffsetWithPartition();

        //手动订阅某个或者某些分区,并提交offset
        commitedOffsetWithPartition2();

        //手动指定offset起始位置,及手动提交offset
        controlOffset();
		
		//流量控制,限流
		flowControl();
    }

    //######### consumer手动提交offset #########
    public static void commitedOffset(){
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                //把数据保存到数据库中

                //如果数据落库的操作失败了,我们就进行回滚,不要提交offset

            }
            //上面完成了数据的业务操作了,我们进行手动提交offset
            consumer.commitSync();
        }

    }


    //######### consumer手动提交offset并且手动控制partition #########
    public static void commitedOffsetWithPartition(){
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            /**
             * 实现的是:一个topic有多个partition,普通的是:
             * 一个consumer对应一个topic里面多个partition
             * 而我们要实现的是:使一个partition对应一个consumer,
             * 以达到一个效率最优的架构逻辑
             */
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecord = records.records(partition);//获取到一个单独的partition里面内容

                /**
                 * 这就变为循环这一个partition里面内容,这就相当于单个partition做单独处理
                 * 单独处理的目的是: 对每一个partition单独的去提交offset
                 */
                for (ConsumerRecord<String, String> record : pRecord) {
                    //对拉取的数据进行业务操作
                    //.....
                }
                //上面完成了业务操作了,我们进行手动提交offset
                long lastOffset = pRecord.get(pRecord.size() - 1).offset();//服务器上面的offset是我们的起点
                //单个partition中的offset,并且进行提交
                HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));  //服务器上面的offset是我们的起点,通过+1防止重复消费
                //提交offset
                consumer.commitSync(offset);
                System.out.println("==============partition-"+partition+"-end================");

            }

        }

    }
    //######### consumer手动提交offset并且手动控制partition,更高级 #########
    public static void commitedOffsetWithPartition2(){
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //yuge_topic -0,1两个partition
        TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
        TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);

        //消费订阅某个topic的某个分区
        consumer.assign(Collections.singletonList(p0)); //assign指定订阅


        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            /**
             * 实现的是:一个topic有多个partition,普通的是:
             * 一个consumer对应一个topic里面多个partition
             * 而我们要实现的是:使一个partition对应一个consumer,
             * 以达到一个效率最优的架构逻辑
             */
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecord = records.records(partition);//获取到一个单独的partition里面内容

                /**
                 * 这就变为循环这一个partition里面内容,这就相当于单个partition做单独处理
                 * 单独处理的目的是: 对每一个partition单独的去提交offset
                 */
                for (ConsumerRecord<String, String> record : pRecord) {
                    //对拉取的数据进行业务操作
                    //.....
                }
                //上面完成了业务操作了,我们进行手动提交offset
                long lastOffset = pRecord.get(pRecord.size() - 1).offset();//服务器上面的offset是我们的起点
                //单个partition中的offset,并且进行提交
                HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));  //服务器上面的offset是我们的起点,通过+1防止重复消费
                //提交offset
                consumer.commitSync(offset);
                System.out.println("==============partition-"+partition+"-end================");

            }

        }

    }

    //######### 手动指定offset起始位置,及手动提交offset #########
    public static void controlOffset(){
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //yuge_topic -0,1两个partition
        TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);

        //消费订阅某个topic的某个分区
        consumer.assign(Collections.singletonList(p0)); //assign指定订阅


        while (true){
			/**
    		 * 1.开始从0消费
    		 * 2.比如一次消费了100条,把offset置为101,并且存入redis/hbase
    		 * 3.每次poll之前,从redis/hbase获取最新的offset位置,使用.seek指定其起始,
    		 * 然后开始消费
    		 */

			
            //手动指定offset起始位置
            consumer.seek(p0,400);

            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            /**
             * 实现的是:一个topic有多个partition,普通的是:
             * 一个consumer对应一个topic里面多个partition
             * 而我们要实现的是:使一个partition对应一个consumer,
             * 以达到一个效率最优的架构逻辑
             */
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecord = records.records(partition);//获取到一个单独的partition里面内容

                /**
                 * 这就变为循环这一个partition里面内容,这就相当于单个partition做单独处理
                 * 单独处理的目的是: 对每一个partition单独的去提交offset
                 */
                for (ConsumerRecord<String, String> record : pRecord) {
                    //对拉取的数据进行业务操作
                    //.....
                }
                //上面完成了业务操作了,我们进行手动提交offset
                long lastOffset = pRecord.get(pRecord.size() - 1).offset();//服务器上面的offset是我们的起点
                //单个partition中的offset,并且进行提交
                HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));  //服务器上面的offset是我们的起点,通过+1防止重复消费
                //提交offset
                consumer.commitSync(offset);
                System.out.println("==============partition-"+partition+"-end================");

            }

        }

    }
        //######### 流量控制,限流 #########
    public static void flowControl(){
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.common.kafka.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //yuge_topic -0,1两个partition
        TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
        TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);

        //消费订阅某个topic的某个分区
        consumer.assign(Collections.singletonList(p0)); //assign指定订阅

        long totalNum = 40; //最大流量值

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            /**
             * 实现的是:一个topic有多个partition,普通的是:
             * 一个consumer对应一个topic里面多个partition
             * 而我们要实现的是:使一个partition对应一个consumer,
             * 以达到一个效率最优的架构逻辑
             */
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecord = records.records(partition);//获取到一个单独的partition里面内容

                /**
                 * 这就变为循环这一个partition里面内容,这就相当于单个partition做单独处理
                 * 单独处理的目的是: 对每一个partition单独的去提交offset
                 */
                long num = 0;
                for (ConsumerRecord<String, String> record : pRecord) {
                    //对拉取的数据进行业务操作
                    //.....


                    //限流
                    num++;
                    if (record.partition()==0){
                        if (num>=totalNum){
                            consumer.pause(Collections.singletonList(p0));
                        }
                    }
                    if (record.partition()==1){
                        if (num == 40){
                            consumer.resume(Collections.singletonList(p0));
                        }
                    }

                }

                //上面完成了业务操作了,我们进行手动提交offset
                long lastOffset = pRecord.get(pRecord.size() - 1).offset();//服务器上面的offset是我们的起点
                //单个partition中的offset,并且进行提交
                HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));  //服务器上面的offset是我们的起点,通过+1防止重复消费
                //提交offset
                consumer.commitSync(offset);
                System.out.println("==============partition-"+partition+"-end================");
            }
        }
    }
}

Consumer多线程并发处理工具类

1.第一种多线程处理架构(生产使用)

传统经典式,消费成功提交offset,消费失败回滚,能保证数据一致性,处理成功就是成功
在这里插入图片描述

package test;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class ConsumerThreadUtil {
    private final static String TOPIC_NAME="jiangzh-topic";

    /**
     *这种类型是经典模式,每一线程单独创建一个KafkaComsumer,用于保证线程安全
     */
    public static void main(String[] args) throws InterruptedException {
        KafkaConsumerRunner r1 = new KafkaConsumerRunner();
        Thread t1 = new Thread(r1);

        t1.start();

        Thread.sleep(15000);

        r1.shutdown();
    }

    public static class KafkaConsumerRunner implements Runnable{
        private final AtomicBoolean closed = new AtomicBoolean(false); //状态开关
        private final KafkaConsumer consumer;//每一个线程创建一个consumer,保证线程安全

        public KafkaConsumerRunner() { //kafka的初始化->生成一个kafka的客户端
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.220.128:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "false");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            consumer = new KafkaConsumer<>(props);

            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
            TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);


            consumer.assign(Arrays.asList(p0,p1));//指定消费的partition
        }


        public void run() {
            try {
                while(!closed.get()) { //控制器
                    //处理消息
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));

                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                        // 处理每个分区的消息
                        for (ConsumerRecord<String, String> record : pRecord) {
                            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                                    record.partition(),record.offset(), record.key(), record.value());
                        }

                        // 返回去告诉kafka新的offset
                        long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                        // 注意加1
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                    }

                }
            }catch(WakeupException e) {
                if(!closed.get()) {
                    throw e;
                }
            }finally {
                consumer.close();
            }
        }

        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }

}

2.第二种多线程架构模式

用于快速处理数据,适合流式数据,对准确性,是无法满足的,因为consumer是无法提交offset的
在这里插入图片描述EventHander就是一个单独的线程
他的考量是consumer如果创建过多,会导致线程过重

package test;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.*;

public class ConsumerRecordThreadSample {
    private final static String TOPIC_NAME = "jiangzh-topic";

    public static void main(String[] args) throws InterruptedException {
        String brokerList = "192.168.220.128:9092";
        String groupId = "test";
        int workerNum = 5;

        CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
        consumers.execute(workerNum);

        Thread.sleep(1000000);

        consumers.shutdown();

    }

    // Consumer处理
    public static class CunsumerExecutor{//线程池
        private final KafkaConsumer<String, String> consumer;
        private ExecutorService executors;

        public CunsumerExecutor(String brokerList, String groupId, String topic) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", groupId);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));
        }

        public void execute(int workerNum) {//workerNum核心线程数
            executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
            //executors就是我们的线程池
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(200);
                for (final ConsumerRecord record : records) {
                    executors.submit(new ConsumerRecordWorker(record));
                }
            }
        }

        public void shutdown() {
            if (consumer != null) {
                consumer.close();
            }
            if (executors != null) {
                executors.shutdown();
            }
            try {
                if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
                    System.out.println("Timeout.... Ignore for this case");
                }
            } catch (InterruptedException ignored) {
                System.out.println("Other thread interrupted this shutdown, ignore for this case.");
                Thread.currentThread().interrupt();
            }
        }


    }

    // 记录处理
    public static class ConsumerRecordWorker implements Runnable {//运行这个类就会执行run中的逻辑

        private ConsumerRecord<String, String> record;

        public ConsumerRecordWorker(ConsumerRecord record) {
            this.record = record;
        }

        @Override
        public void run() {
            //业务逻辑代码
            System.out.println("Thread - "+ Thread.currentThread().getName());
            System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                    record.partition(), record.offset(), record.key(), record.value());
        }

    }
}

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐