Kafka工具类-kafkaproducterAPI工具类
1.Kafka客户端操作AdminClientAPI:允许管理和检测Topic,broker以及其他kafka对象ProducerAPI:发布消息到1个或者多个topicConsumerAPI:订阅一个或者多个topic,并处理产生的消息上述三类API为我们生产中主要使用的API2.producer发送模式异步发送同步发送回调发送kafka的Future类型,就是发出去不管了producer发送过
·
1.Kafka客户端操作
- AdminClientAPI:允许管理和检测Topic,broker以及其他kafka对象
- ProducerAPI:发布消息到1个或者多个topic
- ConsumerAPI:订阅一个或者多个topic,并处理产生的消息
上述三类API为我们生产中主要使用的API
2.producer发送模式
- 异步发送
- 同步发送
- 回调发送
kafka的Future类型,就是发出去不管了
producer发送过程大致可以分成两个部分,一个是构建我们的Kafkaproducer,第二步部分是send出去
package test;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* ProducerUtils类:实现的是producer的
* 1.异步发送
* 2.异步阻塞发送
* 3.异步回调发送
*/
public class ProducerUtils {
private final static String TOPIC_NAME = "yuge_topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
//Producer异步发送
producerSend();
//Prducer异步阻塞发送
producerSyncSend();
//异步回调发送
callBackProducerSend();
}
//############## producer异步向指定topic发送数据 ##################
public static void producerSend(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, Object> producer = new KafkaProducer<String, Object>(properties);
for (int i = 0; i < 10; i++) {
//消息对象 ProducerRecord
ProducerRecord producerRecord = new ProducerRecord<String,String>(TOPIC_NAME,"key_"+i,"value_"+i);
producer.send(producerRecord);
}
//所有的通道打卡都需要关闭
producer.close();
}
//############## producer同步(或者叫异步阻塞)向指定topic发送数据 ##################
public static void producerSyncSend() throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
//消息对象 ProducerRecord
ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,"key_"+i,"value_"+i);
Future<RecordMetadata> send = producer.send(producerRecord);
RecordMetadata recordMetadata = send.get();//发送一次你一get就把它阻塞子这里了
System.out.println("partition:"+recordMetadata.partition()+",offset"+recordMetadata.offset());
}
//所有的通道打卡都需要关闭
producer.close();
}
//############## producer异步回调发送 ##################
public static void callBackProducerSend(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, Object> producer = new KafkaProducer<String, Object>(properties);
for (int i = 0; i < 10; i++) {
//消息对象 ProducerRecord
ProducerRecord producerRecord = new ProducerRecord<String,String>(TOPIC_NAME,"key_"+i,"value_"+i);
producer.send(producerRecord, new Callback() { //回调:当kafka执行完成send后就会触发此函数的执行
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("partition:"+metadata.partition()+",offset"+metadata.offset());
}
});
}
//所有的通道打卡都需要关闭
producer.close();
}
}
3.自定义partition负载均衡
package test;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* customPartitionLoadBalancing类:实现的是自定义partition的负载均衡
*/
public class customPartitionLoadBalancing implements Partitioner {
//###############这是最核心的部分:什么样的数据进行什么样的partition#####################
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
/**
* key-1
* key-2
* key-3 这个可以还有结合业务做一个调整
*/
String keyStr = key + "";
String keyInt = keyStr.substring(4);//4是开始索引.我们取key-后面的数字
System.out.println("keyStr:"+keyStr+"keyInt"+keyInt);
int i = Integer.parseInt(keyInt);
return i%2; //模2的话他就是0101(2个分区),具体模几要看你自己的分区数是多少
}
//################下面的内容不用管它###################
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
更多推荐
已为社区贡献4条内容
所有评论(0)