1.Kafka客户端操作

  • AdminClientAPI:允许管理和检测Topic,broker以及其他kafka对象
  • ProducerAPI:发布消息到1个或者多个topic
  • ConsumerAPI:订阅一个或者多个topic,并处理产生的消息
    上述三类API为我们生产中主要使用的API
    在这里插入图片描述

2.AdminClientAPI

在这里插入图片描述
第一个对象AdminClient是一个入口对象,其余所有对象都需要依赖AdminClient创建成功,通过AdminClient来实现方法

1.2实例演示

1.2.1maven配置

在这里插入代码片

1.2.3API调用代码演示

  • 代码实现:
    创建topic
    获取topic
    删除topic
    topic描述信息查询
    topic配置信息查询
package test02;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;

import java.util.*;
import java.util.concurrent.ExecutionException;

public class admin {
    public final static String TOPIC_NAME = "yuge_topic";


    public static void main(String[] args) throws Exception{

        //创建topic实例
        createTopic();

        //查看topic列表
        topicLists();

        //删除topic
        delTopics();

        //查询topic描述信息
        descriptionTopics();

        //查询topic配置信息
        configTopics();

        //修改topic配置
        alterTopics();

        //增加partition数量:kafka的partition只能增加不能减少
        createpartitions(2);

    }


    //############ 创建topic  ##################
    public static void createTopic(){
        AdminClient adminclient = adminclient();
        //副本集数据,一定是一个short类型
        Short rs = 3;
        NewTopic newTopic = new NewTopic(TOPIC_NAME,3,rs); //名称,分区数量,副本数量
        CreateTopicsResult topics = adminclient.createTopics(Collections.singletonList(newTopic));
        System.out.println("CreateTopicsResult"+topics);
    }
    //############ 获取topic列表  ################
    public static void topicLists() throws Exception {
        AdminClient adminclient = adminclient();
        //是否查看internal选项,internal,就是kafka内置的topic
        ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
        listTopicsOptions.listInternal(true);
        ListTopicsResult listTopicsResult = adminclient.listTopics(listTopicsOptions); //有传入参数的方法,通过参数控制是否显示kafka内置topic
        //ListTopicsResult listTopicsResult = adminclient.listTopics(); 无传入参数的
        KafkaFuture<Map<String, TopicListing>> namesToListings = listTopicsResult.namesToListings();//就是一个map,k为名称,v为topic列表
        Set<Map.Entry<String, TopicListing>> namesToListingsSet = namesToListings.get().entrySet();
        namesToListingsSet.stream().forEach((topicList)-> System.out.println(topicList));

        Set<String> names = listTopicsResult.names().get();
        names.stream().forEach((name)->System.out.println(name));
    }
    //############ 删除topic  ################
    public static void delTopics(){
        AdminClient adminclient = adminclient();
        adminclient.deleteTopics(Collections.singletonList(TOPIC_NAME));
    }

    //############ 查询topic描述信息  ################
    public static void descriptionTopics() throws ExecutionException, InterruptedException {
        AdminClient adminclient = adminclient();
        DescribeTopicsResult describeTopicsResult = adminclient.describeTopics(Collections.singletonList(TOPIC_NAME));
        Map<String, TopicDescription> topicDescriptionMap = describeTopicsResult.all().get();
        Set<Map.Entry<String, TopicDescription>> set = topicDescriptionMap.entrySet();
        set.stream().forEach((topic)-> System.out.println("name:"+topic.getKey()+"desc:"+topic.getValue()));
    }

    //############ 查询topic配置信息  ################
    public static void configTopics() throws ExecutionException, InterruptedException {
        AdminClient adminclient = adminclient();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
        Set<Map.Entry<ConfigResource, Config>> configDesc = adminclient.describeConfigs(Collections.singletonList(configResource)).all().get().entrySet();
        configDesc.stream().forEach((config)-> System.out.println(config));
    }
    //############ 修改topic配置  ################
    public static void alterTopics(){  //老版的api
        AdminClient adminclient = adminclient();
        HashMap<ConfigResource, Config> resourceConfigHashMap = new HashMap<>();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
        Config config = new Config(Collections.singletonList(new ConfigEntry("prcallocate","true")));//键值对,你k对应的value要改成什么值
        resourceConfigHashMap.put(configResource,config);
        adminclient.alterConfigs(resourceConfigHashMap);
    }
    //新的2.3以上版本API修改topic方式
    public static void newAlterTopics(){
        AdminClient adminclient = adminclient();
        HashMap<ConfigResource, Collection<AlterConfigOp>> resourceConfigHashMap = new HashMap<>();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("prcallocate","fasle"),AlterConfigOp.OpType.SET);
        resourceConfigHashMap.put(configResource,Collections.singletonList(alterConfigOp));
        adminclient.incrementalAlterConfigs(resourceConfigHashMap);
    }
    //############ partition增加  ################
    public static void createpartitions(int amount){
        AdminClient adminclient = adminclient();
        HashMap<String, NewPartitions> partitionsHashMap = new HashMap<>();
        NewPartitions newPartitions = NewPartitions.increaseTo(amount);//控制增加partition数量
        partitionsHashMap.put(TOPIC_NAME,newPartitions);
        adminclient.createPartitions(partitionsHashMap);
    }
    //############ 创建admin客户端:客户端是所有的操作入口  ################
    public static AdminClient adminclient(){

        Properties properties = new Properties();
        //admin基本不需要什么配置,能连接服务器就可以了
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.83.120:9092");
        AdminClient adminClient = AdminClient.create(properties);

        return adminClient;
    }
}
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐