通过java方式使用Kafka
基于Java API方式使用Kafka
·
一、kafka基本概念
kafka将一个topic分为多个Partition,Partition在物理上由多个segment数据文件组成,每个segment数据文件都大小相等,按照顺序读写。每个Partition上的数据都均衡的分布在不同的broker上,partition的个数不能超过broker节点的个数。
一个Partition上的消息是时间有序的,多个Partition之间的顺序无法保证
kafka中很重要的特性,只需要一次消息,可以支持任意多的应用读取这个消息,consumer通过pull方式消费消息,kafka不删除已消费的消息,kafka中的数据的删除和其是否消息没有关系,只跟kafka broker上的两个配置有关系
- log.retention.hours=48 ===>数据最多保存48小时
- log.retention.byte=1073741824 ===>数据量最大1g
二、编写生产者客户端
2.1 引入pom
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
2.2 编写生产者客户端代码
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers","192.168.221.131:9092");
prop.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
prop.put("acks","all");
prop.put("retries",0);
prop.put("batch.size",16384);
prop.put("linger.ms",1);
prop.put("buffer.memory",33554432);
String topic ="hello";
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
producer.send(new ProducerRecord<String,String>(topic,Integer.toString(2),"hello
kafka3"));
producer.close();
}
2.3 ack 消息确认机制:有三个值:0、1、all
- 如果acks=0:表示需要Leader节点回复收到消息,这样生产者才会发送下一条数据
- 如果acks=1:只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。
- 如果acks=all:表示需要所有Leader+副本节点回复收到消息(acks=-1),这样生产者才会发送下一条数 据
2.4 retries
如果当前请求失败,则生产者可以自动重新连接,但是要是设置retries=0参数,则意味着请示失败不会重新连接,这样可以避免重复发送的可能
2.5 key.serializer 、value.serializer
数据在网络中传输需要进行序列化
2.6 send()
方法中有三个参数,第一个是指定发送的主题,第二个是设置消息的key,第三个是消息value
三、编写消费者客户端
Properties prop = new Properties();
prop.put("bootstrap.servers","192.168.221.131:9092");
prop.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
prop.put("group.id","con-1");
prop.put("auto.offset.reset","latest");
//自动提交偏移量
prop.put("auto.commit.intervals.ms","true");
//自动提交时间
prop.put("auto.commit.interval.ms","1000")
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
ArrayList<String> topics = new ArrayList<>();
//可以订阅多个消息
topics.add("hello");
consumer.subscribe(topics);
while(true){
ConsumerRecords<String,String> poll = consumer.poll(Duration.ofSeconds(20));
for(ConsumerRecord<String,String> consumerRecord :poll){
System.out.println(consumerRecord);
}
}
}
3.1 prop.put("group.id","con-1");
指定消费者组id,在同一时刻消费组只有一个线程可以去消费一个分区的数据,不同的消费组可以消费同一个分区的消息
更多推荐
已为社区贡献1条内容
所有评论(0)