Spring Boot + Kafka的使用
一、快速了解Kafka在把Kafka集成到spring之前,我们首先要了解Kafka是什么?由什么东西组成?主要的使用场景是哪些?Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外k...
一、快速了解Kafka
在把Kafka集成到spring之前,我们首先要了解Kafka是什么?由什么东西组成?主要的使用场景是哪些?
Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。
简单架构理解图
详细架构图
-
1)Producer :消息生产者,就是向kafka broker发消息的客户端;
-
2)Consumer :消息消费者,向kafka broker取消息的客户端;
-
3)Topic :可以理解为一个队列;
-
4) Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
-
5)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;
-
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;
-
7)Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。
二、环境准备
这边的话,我们简单的在windows搭建一个环境即可
主要的就是以下三个环境:
- jdk
- zookeeper
- kafka
具体步骤这里就不详细介绍了,网上有很多案例,照着做一遍即可。
本地测试的话,搭建个单机的即可。
三、Spring Kafka集成
1、添加架包依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
注意:这里有一个很大的坑,因为版本的问题,spring-kafka和kafka-clients的版本一定要按照下图对应。
2、简单配置
推荐使用spring-boot的项目,配置既简单又方便
直接在application.yml配置文件加入以下内容即可。
spring:
kafka:
# 消费者
consumer:
group-id: foo
auto-offset-reset: earliest
bootstrap-servers: localhost:9092
# 生产者
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
如果你想了解更多的配置,可以参考文档 Apache Kafka Documentation
注意:运行项目之前一定要先开启zookeeper和kafka服务
3、简单的例子
监听器主要是使用@KafkaListenter注解即可,可以监听多个topic也可以监听单个。
@Component
public class SimpleListener {
@KafkaListener(topics = {"topic1", "topic2"})
public void listen1(String data) {
System.out.println(data);
}
}
消息发送主要是使用KafkaTemplate,它具有多个方法可以发送消息,这里我们用简单的。
@RestController
@AllArgsConstructor
public class SimpleController {
private final KafkaTemplate<Object, Object> kafkaTemplate;
@GetMapping("/send/{messge}")
public String send(@PathVariable String messge) {
kafkaTemplate.send("topic1", "topci1:" + messge);
kafkaTemplate.send("topic2", "topci2:" + messge);
return messge;
}
}
我们用postman测试一下,看看控制台有没有输出,有没有接受到消息。
4、发送实体类封装的消息
4.1实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Bar {
private Integer id;
private Integer age;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Foo {
private Integer id;
private String name;
}
4.2 配置文件
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template), 3));
return factory;
}
// 当传输的是个实体类时,进行消息格式转换
@Bean
public RecordMessageConverter converter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("com.lzx.kafka.example2");
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("foo", Foo.class);
mappings.put("bar", Bar.class);
typeMapper.setIdClassMapping(mappings);
converter.setTypeMapper(typeMapper);
return converter;
}
@Bean
public NewTopic foos() {
return new NewTopic("foo", 1, (short) 1);
}
@Bean
public NewTopic bars() {
return new NewTopic("bar", 1, (short) 1);
}
}
4.3 application.yml配置文件
spring:
kafka:
consumer:
group-id: foo
auto-offset-reset: earliest
bootstrap-servers: localhost:9092
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.type.mapping: foo:com.lzx.kafka.entity.Foo,bar:com.lzx.kafka.entity.Bar
4.4 代码
监听器
@Component
@KafkaListener(id = "handler", topics = {"foo", "bar"})
public class ListenHandler {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@KafkaHandler
public void foo(@Payload Foo foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("key:" + key);
System.out.println("foo:" + foo.toString());
}
@KafkaHandler
public void foo(Bar bar) {
System.out.println("bar:" + bar.toString());
}
}
Controlller
@RestController
@AllArgsConstructor
public class Example2Controller {
private final KafkaTemplate kafkaTemplate;
@PostMapping("/foo")
public void send(Foo foo){
kafkaTemplate.send("foo", "modelOne", foo);
}
@PostMapping("/bar")
public void send(Bar bar){
kafkaTemplate.send("bar", bar);
}
}
4.5结果
5、消息发送的同步方法和异步方法
方法
@Service
@AllArgsConstructor
public class SendService {
private final KafkaTemplate<Object, Object> template;
// 异步
public void sendAnsyc(final Bar bar) {
// ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("ansyc", bar);
ListenableFuture<SendResult<Object, Object>> future = template.send("ansyc",bar);
future.addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onSuccess(SendResult<Object, Object> result) {
System.out.println("发送消息成功:" + result);
}
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ ex.getMessage());
}
});
}
// 同步
public void sendSync(final Bar bar) {
ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("sync", bar);
try {
template.send(producerRecord).get(10, TimeUnit.SECONDS);
System.out.println("发送成功");
}
catch (ExecutionException e) {
System.out.println("发送消息失败:"+ e.getMessage());
}
catch (TimeoutException | InterruptedException e) {
System.out.println("发送消息失败:"+ e.getMessage());
}
}
}
监听器
@Component
public class Example3Listenter {
@KafkaListener(topics = "ansyc")
public void listenAnsyc(Bar bar) {
System.out.println(bar);
}
@KafkaListener(topics = "sync")
public void listenSync(Bar bar) {
System.out.println(bar);
}
}
Controller
@RestController
@AllArgsConstructor
public class Example3Controller {
private final SendService sendService;
@PostMapping("/ansyc")
public void sendAnsyc(Bar bar){
sendService.sendAnsyc(bar);
}
@PostMapping("/sync")
public void sendSync(Bar bar){
sendService.sendSync(bar);
}
}
异步结果
同步结果
6、使用事务的消息发送方式
在4.3application.yml中的properties配置上方添加这样的一句配置即可
transaction-id-prefix: tx.
代码
@RestController
@AllArgsConstructor
public class Example1Controller {
private final KafkaTemplate<Object, Object> kafkaTemplate;
@PostMapping("/send/foo")
public void sendFoo(Foo foo) {
kafkaTemplate.executeInTransaction(kafkaTemplate -> {
kafkaTemplate.send("foo", foo);
return true;
});
}
}
四、相关资料连接
更多推荐
所有评论(0)