Spring-Kafka配置读取与KafkaTemplate多实例实现
Spring-Kafka配置读取与默认实现的初始化Spring-Kafka版本信息Spring-Kafka配置读取类默认Bean的实现KafkaTemplate多实例使用Spring-Kafka版本信息工作也好多年了,头一次写博客,内容也比较简单,希望大家多多支持,多提建议。本篇文章根据以下版本依赖进行说明<dependency><...
Spring-Kafka配置读取与默认实现的初始化
Spring-Kafka版本信息
工作也好多年了,头一次写博客,内容也比较简单,希望大家多多支持,多提建议。
本篇文章根据以下版本依赖进行说明
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.3.RELEASE</version>
</dependency>
Spring-Kafka配置读取类
KafkaProperties类为加载配置信息类,在初始化的时候,可以无需自己编写代码读取kafka相关配置,代码中可以直接注入KafkaProperties,在根据需要,覆盖ProducerFactory的属性。
其中consumer、producer 、admin、streams、listener分别对应不同的配置信息,如consumer对应spring.kafka.consumer开头的配置信息。
buildConsumerProperties方法则表示获取consumer的配置信息,返回值为Map。
server:
port: 9006
servlet:
context-path: /kafka
spring:
kafka:
# Kafka服务端监听地址端口,集群用逗号分隔
bootstrap-servers: 192.168.31.249:9092
consumer:
# 消费者组ID,在消费者实例没有指定消费者组的时候生效
group-id: test01
# 如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。
enable-auto-commit: true
# 每次自动提交offset的时间间隔,当enable-auto-commit设置为true时生效,默认值为5000,单位ms
auto-commit-interval: 500
# kafka服务(实际是zookeeper)中没有初始化的offset时,如果offset是以下值的回应:
# earliest:自动复位offset为smallest的offset
# latest:自动复位offset为largest的offset
# anything else:向consumer抛出异常
# none:如果整个消费者组中没有以往的offset,则抛出异常
auto-offset-reset: latest
# message的key的解码类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# message的value的解码类
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 单次消费获取数据的最大条数
max-poll-records: 500
# 每次fetch请求时,server应该返回的最小字节数。
# 如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。默认值为1,单位bytes
fetch-min-size: 1
# 如果没有足够的数据能够满足fetch.min.bytes(fetch-min-size),
# 则此项配置是指在应答fetch请求之前,server会阻塞的最大时间,默认值为100,单位ms
fetch-max-wait: 100
# 如果设置为read_committed,则consumer会缓存消息,直到收到消息对应的事务控制消息。
# 若事务commit,则对外发布这些消息;若事务abort,则丢弃这些消息
# 默认值为read_uncommitted
isolation-level: read_uncommitted
producer:
# producer需要server接收到数据之后发出的确认接收的信号
# acks=0:设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1;
# acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
# acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
acks: 1
# 设置大于0的值将使客户端重新发送任何数据。
# 注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
retries: 4
# producer将试图批处理消息记录,以减少请求次数,这项配置控制默认的批量处理消息字节数,默认值16384,单位bytes
batch-size: 16384
properties:
# producer发送消息的延时,与batch-size配合使用,默认值0,单位ms
linger:
ms: 100
# producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,
# 默认值33554432,单位bytes
buffer-memory: 33554432
# key的序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 生产者生成的所有数据的压缩类型,此配置接受标准压缩编解码器('gzip','snappy','lz4','zstd')
# 默认为none
compression-type: none
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connections to the Kafka cluster. Applies to all components unless overridden.
*/
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
/**
* ID to pass to the server when making requests. Used for server-side logging.
*/
private String clientId;
/**
* Additional properties, common to producers and consumers, used to configure the
* client.
*/
private final Map<String, String> properties = new HashMap<>();
private final Consumer consumer = new Consumer();
private final Producer producer = new Producer();
private final Admin admin = new Admin();
private final Streams streams = new Streams();
private final Listener listener = new Listener();
... ...
/**
* Create an initial map of consumer properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default kafkaConsumerFactory bean.
* @return the consumer properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildConsumerProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.consumer.buildProperties());
return properties;
}
/**
* Create an initial map of producer properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default kafkaProducerFactory bean.
* @return the producer properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildProducerProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.producer.buildProperties());
return properties;
}
/**
* Create an initial map of admin properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default kafkaAdmin bean.
* @return the admin properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildAdminProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.admin.buildProperties());
return properties;
}
/**
* Create an initial map of streams properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary.
* @return the streams properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildStreamsProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.streams.buildProperties());
return properties;
}
}
默认Bean的实现
KafkaAutoConfiguration类,提供了Kafka常用Bean的默认实现,包括KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory、KafkaAdmin 等,每个实现的Bean都使用了@ConditionalOnMissingBean注解,表示当开发人员没有自己单独实现的时候,使用默认实现,当开发人员单独实现的时候,默认实现不起作用,不会初始化默认的Bean实现。
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
@Bean
@ConditionalOnMissingBean(ProducerListener.class)
public ProducerListener<Object, Object> kafkaProducerListener() {
return new LoggingProducerListener<>();
}
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
}
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
@Bean
@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
@ConditionalOnMissingBean
public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
@ConditionalOnMissingBean
public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
Jaas jaasProperties = this.properties.getJaas();
if (jaasProperties.getControlFlag() != null) {
jaas.setControlFlag(jaasProperties.getControlFlag());
}
if (jaasProperties.getLoginModule() != null) {
jaas.setLoginModule(jaasProperties.getLoginModule());
}
jaas.setOptions(jaasProperties.getOptions());
return jaas;
}
@Bean
@ConditionalOnMissingBean
public KafkaAdmin kafkaAdmin() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
return kafkaAdmin;
}
}
KafkaTemplate多实例使用
以KafkaTemplate为例,在某些使用场景向,可以在一个工程中,创建多个KafkaTemplate,比如当key的序列化类不同的时候,这个时候需要注意的是,由于Spring-Kafka的默认装配的Bean使用了@ConditionalOnMissingBean的注解,如果原工程使用的是默认的Bean实现,此处需要重新编写Bean的默认实现,并修改原有的KafkaTemplate的自动注入使用代码,使用@Qualifier注解,指定默认实现的Bean名称,新的代码使用新的Bean名称,就能做到同时存在多个KafkaTemplate了。
package com.example.kafka.producer.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaTemplateConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean(name="defaultKafkaTemplate")
public KafkaTemplate<?, ?> kafkaTemplate(@Qualifier("defaultKafkaProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
@Bean
public ProducerListener<Object, Object> kafkaProducerListener() {
return new LoggingProducerListener<>();
}
@Bean(name="defaultKafkaProducerFactory")
public ProducerFactory<Object, Object> kafkaProducerFactory() {
DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
kafkaProperties.buildProducerProperties());
String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
/**
* 获取生产者工厂
*/
@Bean(name="newKafkaProducerFactory")
public ProducerFactory<Object, Object> newProducerFactory() {
Map<String, Object> producerProperties = kafkaProperties.buildProducerProperties();
// 修改参数名称
producerProperties.put(ProducerConfig.ACKS_CONFIG,"all");
DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
producerProperties);
String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
@Bean(name="newKafkaTemplate")
public KafkaTemplate<?, ?> newKafkaTemplate(@Qualifier("newKafkaProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
}
import com.example.kafka.producer.entity.ProduceEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* Kafka生产者服务
*/
@RestController
public class KafkaProducerController {
@Autowired
@Qualifier("defaultKafkaTemplate")
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
@Qualifier("newKafkaTemplate")
private KafkaTemplate<String, String> newKafkaTemplate;
@PostMapping("produce")
public void produce(@RequestBody ProduceEntity produceEntity) {
for(int i = 0; i<12; i++) {
kafkaTemplate.send(produceEntity.getTopic(), i+ " " + produceEntity.getMessage());
newKafkaTemplate.send(produceEntity.getTopic(), "new "+ i +" "+produceEntity.getMessage());
}
}
}
更多推荐
所有评论(0)