1.Kafka客户端操作

  • AdminClientAPI:允许管理和检测Topic,broker以及其他kafka对象
  • ProducerAPI:发布消息到1个或者多个topic
  • ConsumerAPI:订阅一个或者多个topic,并处理产生的消息
    上述三类API为我们生产中主要使用的API

在这里插入图片描述

2.producer发送模式

  • 异步发送
  • 同步发送
  • 回调发送
    kafka的Future类型,就是发出去不管了

producer发送过程大致可以分成两个部分,一个是构建我们的Kafkaproducer,第二步部分是send出去
在这里插入图片描述
在这里插入图片描述

package test;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * ProducerUtils类:实现的是producer的
 * 1.异步发送
 * 2.异步阻塞发送
 * 3.异步回调发送
 */
public class ProducerUtils {
    private final static String TOPIC_NAME = "yuge_topic";


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //Producer异步发送
        producerSend();

        //Prducer异步阻塞发送
        producerSyncSend();

        //异步回调发送
        callBackProducerSend();




    }
    //############## producer异步向指定topic发送数据 ##################
    public static void producerSend(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        properties.put(ProducerConfig.RETRIES_CONFIG,"0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, Object> producer = new KafkaProducer<String, Object>(properties);
        for (int i = 0; i < 10; i++) {
            //消息对象 ProducerRecord
            ProducerRecord producerRecord = new ProducerRecord<String,String>(TOPIC_NAME,"key_"+i,"value_"+i);
            producer.send(producerRecord);
        }
        //所有的通道打卡都需要关闭
        producer.close();
    }
    //############## producer同步(或者叫异步阻塞)向指定topic发送数据 ##################
    public static void producerSyncSend() throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        properties.put(ProducerConfig.RETRIES_CONFIG,"0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(properties);
        for (int i = 0; i < 10; i++) {
            //消息对象 ProducerRecord
            ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,"key_"+i,"value_"+i);
            Future<RecordMetadata> send = producer.send(producerRecord);
            RecordMetadata recordMetadata = send.get();//发送一次你一get就把它阻塞子这里了
            System.out.println("partition:"+recordMetadata.partition()+",offset"+recordMetadata.offset());
        }
        //所有的通道打卡都需要关闭
        producer.close();
    }
    //############## producer异步回调发送 ##################
    public static void callBackProducerSend(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        properties.put(ProducerConfig.RETRIES_CONFIG,"0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, Object> producer = new KafkaProducer<String, Object>(properties);
        for (int i = 0; i < 10; i++) {
            //消息对象 ProducerRecord
            ProducerRecord producerRecord = new ProducerRecord<String,String>(TOPIC_NAME,"key_"+i,"value_"+i);
            producer.send(producerRecord, new Callback() { //回调:当kafka执行完成send后就会触发此函数的执行
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("partition:"+metadata.partition()+",offset"+metadata.offset());
                }
            });
        }
        //所有的通道打卡都需要关闭
        producer.close();
    }
}

3.自定义partition负载均衡

package test;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * customPartitionLoadBalancing类:实现的是自定义partition的负载均衡
 */
public class customPartitionLoadBalancing implements Partitioner {
    //###############这是最核心的部分:什么样的数据进行什么样的partition#####################
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /**
         * key-1
         * key-2
         * key-3   这个可以还有结合业务做一个调整
         */
        String keyStr = key + "";
        String keyInt = keyStr.substring(4);//4是开始索引.我们取key-后面的数字
        System.out.println("keyStr:"+keyStr+"keyInt"+keyInt);
        int i = Integer.parseInt(keyInt);
        return i%2;  //模2的话他就是0101(2个分区),具体模几要看你自己的分区数是多少
    }
    //################下面的内容不用管它###################
    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐