Kafka工具类-AdminClientAPI工具类
1.Kafka客户端操作AdminClientAPI:允许管理和检测Topic,broker以及其他kafka对象ProducerAPI:发布消息到1个或者多个topicConsumerAPI:订阅一个或者多个topic,并处理产生的消息上述三类API为我们生产中主要使用的API...
·
1.Kafka客户端操作
- AdminClientAPI:允许管理和检测Topic,broker以及其他kafka对象
- ProducerAPI:发布消息到1个或者多个topic
- ConsumerAPI:订阅一个或者多个topic,并处理产生的消息
上述三类API为我们生产中主要使用的API
2.AdminClientAPI
第一个对象AdminClient是一个入口对象,其余所有对象都需要依赖AdminClient创建成功,通过AdminClient来实现方法
1.2实例演示
1.2.1maven配置
在这里插入代码片
1.2.3API调用代码演示
- 代码实现:
创建topic
获取topic
删除topic
topic描述信息查询
topic配置信息查询
package test02;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class admin {
public final static String TOPIC_NAME = "yuge_topic";
public static void main(String[] args) throws Exception{
//创建topic实例
createTopic();
//查看topic列表
topicLists();
//删除topic
delTopics();
//查询topic描述信息
descriptionTopics();
//查询topic配置信息
configTopics();
//修改topic配置
alterTopics();
//增加partition数量:kafka的partition只能增加不能减少
createpartitions(2);
}
//############ 创建topic ##################
public static void createTopic(){
AdminClient adminclient = adminclient();
//副本集数据,一定是一个short类型
Short rs = 3;
NewTopic newTopic = new NewTopic(TOPIC_NAME,3,rs); //名称,分区数量,副本数量
CreateTopicsResult topics = adminclient.createTopics(Collections.singletonList(newTopic));
System.out.println("CreateTopicsResult"+topics);
}
//############ 获取topic列表 ################
public static void topicLists() throws Exception {
AdminClient adminclient = adminclient();
//是否查看internal选项,internal,就是kafka内置的topic
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
ListTopicsResult listTopicsResult = adminclient.listTopics(listTopicsOptions); //有传入参数的方法,通过参数控制是否显示kafka内置topic
//ListTopicsResult listTopicsResult = adminclient.listTopics(); 无传入参数的
KafkaFuture<Map<String, TopicListing>> namesToListings = listTopicsResult.namesToListings();//就是一个map,k为名称,v为topic列表
Set<Map.Entry<String, TopicListing>> namesToListingsSet = namesToListings.get().entrySet();
namesToListingsSet.stream().forEach((topicList)-> System.out.println(topicList));
Set<String> names = listTopicsResult.names().get();
names.stream().forEach((name)->System.out.println(name));
}
//############ 删除topic ################
public static void delTopics(){
AdminClient adminclient = adminclient();
adminclient.deleteTopics(Collections.singletonList(TOPIC_NAME));
}
//############ 查询topic描述信息 ################
public static void descriptionTopics() throws ExecutionException, InterruptedException {
AdminClient adminclient = adminclient();
DescribeTopicsResult describeTopicsResult = adminclient.describeTopics(Collections.singletonList(TOPIC_NAME));
Map<String, TopicDescription> topicDescriptionMap = describeTopicsResult.all().get();
Set<Map.Entry<String, TopicDescription>> set = topicDescriptionMap.entrySet();
set.stream().forEach((topic)-> System.out.println("name:"+topic.getKey()+"desc:"+topic.getValue()));
}
//############ 查询topic配置信息 ################
public static void configTopics() throws ExecutionException, InterruptedException {
AdminClient adminclient = adminclient();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
Set<Map.Entry<ConfigResource, Config>> configDesc = adminclient.describeConfigs(Collections.singletonList(configResource)).all().get().entrySet();
configDesc.stream().forEach((config)-> System.out.println(config));
}
//############ 修改topic配置 ################
public static void alterTopics(){ //老版的api
AdminClient adminclient = adminclient();
HashMap<ConfigResource, Config> resourceConfigHashMap = new HashMap<>();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
Config config = new Config(Collections.singletonList(new ConfigEntry("prcallocate","true")));//键值对,你k对应的value要改成什么值
resourceConfigHashMap.put(configResource,config);
adminclient.alterConfigs(resourceConfigHashMap);
}
//新的2.3以上版本API修改topic方式
public static void newAlterTopics(){
AdminClient adminclient = adminclient();
HashMap<ConfigResource, Collection<AlterConfigOp>> resourceConfigHashMap = new HashMap<>();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("prcallocate","fasle"),AlterConfigOp.OpType.SET);
resourceConfigHashMap.put(configResource,Collections.singletonList(alterConfigOp));
adminclient.incrementalAlterConfigs(resourceConfigHashMap);
}
//############ partition增加 ################
public static void createpartitions(int amount){
AdminClient adminclient = adminclient();
HashMap<String, NewPartitions> partitionsHashMap = new HashMap<>();
NewPartitions newPartitions = NewPartitions.increaseTo(amount);//控制增加partition数量
partitionsHashMap.put(TOPIC_NAME,newPartitions);
adminclient.createPartitions(partitionsHashMap);
}
//############ 创建admin客户端:客户端是所有的操作入口 ################
public static AdminClient adminclient(){
Properties properties = new Properties();
//admin基本不需要什么配置,能连接服务器就可以了
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120:9092");
AdminClient adminClient = AdminClient.create(properties);
return adminClient;
}
}
更多推荐
已为社区贡献4条内容
所有评论(0)