1.Admin API

API作用
AdminClientAdminClient客户端对象
NewTopic创建topic
CreateTopicsResult创建topic的返回结果
ListTopicsResult查询topic列表
ListTopicsOptions查询topic列表及选项
DescribeTopicsResult查询topic
DescribeConfigsResult查询topic配置项

2.创建AdminClient客户端
(1).引入依赖

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.3.0</version>
</dependency>

(2).代码示例

public class AdminSample {
    /**
     * 创建AdminClient
     */
    public static AdminClient adminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        AdminClient adminClient = AdminClient.create(properties);
        return adminClient;
    }

    public static void main(String[] args) {
        AdminClient adminClient = AdminSample.adminClient();
        System.out.println("AdminClient:" + adminClient);
    }
}

(3).代码运行结果

23:52:56.389 [main] INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values: 
	bootstrap.servers = [127.0.0.1:9092]
	client.dns.lookup = default
	client.id = 
	connections.max.idle.ms = 300000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 120000
	retries = 5
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

23:52:56.424 [main] DEBUG org.apache.kafka.clients.admin.internals.AdminMetadataManager - [AdminClient clientId=adminclient-1] Setting bootstrap cluster metadata Cluster(id = null, nodes = [127.0.0.1:9092 (id: -1 rack: null)], partitions = [], controller = null).
23:52:56.663 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:
23:52:56.666 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:
23:52:56.666 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-authentication:
23:52:56.667 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-reauthentication:
23:52:56.667 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name successful-authentication-no-reauth:
23:52:56.667 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name failed-authentication:
23:52:56.667 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name failed-reauthentication:
23:52:56.667 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name reauthentication-latency:
23:52:56.668 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:
23:52:56.669 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:
23:52:56.670 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:
23:52:56.670 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:
23:52:56.670 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:
23:52:56.680 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.0
23:52:56.680 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 77a89fcf8d7fa018
23:52:56.680 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1606405976678
23:52:56.681 [main] DEBUG org.apache.kafka.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-1] Kafka admin client initialized
AdminClient:org.apache.kafka.clients.admin.KafkaAdminClient@6a4f787b
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐