RabbitMQ简介并在SpringBoot项目中使用RabbitMQ
rabbitMQ是一个开源的消息代理和队列服务器,通过普通协议在完全不同的应用之间共享数据。RabbitMQ使用Erlang语言编写,并基于AMQP协议Erlang语言 数据传输延迟低(利于承载高并发) socket也一样rabbitMQ可以与SpringAMQP完美整合,SpringAMQP框架提供了原生的rabbitMQ api 也提供了丰富的拓展APIrabbitMQ集群模式丰富,表达式配置
文章目录
写在前面
本文根据慕课视频RabbitMQ消息中间件极速入门与实战整理而来,有兴趣的可以去看一下这个视频,可能会比只看文章更清晰一点,写下这篇文章只是因为个人更喜欢看文字版的东西,也供有相同需求的人参考。
一、简介
1、什么是RabbitMQ?
RabbitMQ是一个开源的消息代理和队列服务器,通过普通协议在完全不同的应用之间共享数据。RabbitMQ使用Erlang语言编写,并基于AMQP协议。
Erlang语言 数据传输延迟低(利于承载高并发) socket也一样
RabbitMQ可以与SpringAMQP完美整合,SpringAMQP框架提供了原生的rabbitMQ api 也提供了丰富的拓展API
AMQP:高级消息队列协议 是一套规范
2、RabbitMQ相关概念解释和联系
①producer和consumer
消息的生产者和消费者
②virtual host、exchange和message queue
virtual host 虚拟地址,用于进行逻辑隔离,最上层的消息路由。
一个virtual host里面可以有若干个exchange和queue,同一个virtual host里面不能有相同名称的exchange和queue
exchange 交换机 接收消息,根据路由键转发消息到绑定的队列
queue 也称为message queue 消息队列 保存消息并将它们转发给消费者
MQ: message queue
③binding和routing key
binding 绑定 exchange和queue之间的虚拟连接,binding中可以包含routing key
routing key 一个路由规则,虚拟机可以用它来确定如何路由一个特定消息
④消息流转
virtual host中的exchange交换机和message queue有绑定关系,通过路由key(routine key)进行关联。
生产者producer将消息投递到server上,指定对应的exchange和路由key。经过virtual host(虚拟主机)和exchange(交换机)后消息被分发到对应的mq。
消费者只需要监听message queue即可取到消息。
⑤其他概念
server 又称为broker 接受客户端的连接,实现AMQP实体服务
connection 连接 应用程序与broker的网络连接
channel 网络信道 进行消息读写的通道 几乎所有的操作都在channel中进行
客户端可建立多个channel 每个channel代表一个会话任务
message 消息 服务器和应用程序之间传送的数据 由properties和body组成
properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性
body就是消息体内容
二、安装
这个已经有很多博客写了,在这里就不赘述,可参考:
Windows下安装RabbitMQ
简述一下就是安装ERLANG,并配置环境变量;安装rabbitmq,并配置环境变量。
但是在这里我也踩了一些坑,尽管和博主写的输入rabbitmqctl status后的结果一样,但是我并不需要做他的任何操作。在配置完两个环境变量后,只需要直接切换路径并输入rabbitmq-server.bat,即可启动。
不在cmd中操作也是完全没问题的,到sbin目录下双击rabbitmq-server.bat文件即可打开http://localhost:15672。
另:换盘符切换路径时,需要加上/d,例如:cd /d D:\rabbitmq-server\rabbitmq_server-3.7.7
三、在SpringBoot项目中简单使用
1、新建Spring项目
并配置server.port等,略。
2、在pom文件中引入相关依赖
引入amqp相关依赖即可
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3、新增application.properties文件相关配置
rabbitmq的端口默认是5672,management是15672
#rabbitmq基本配置
spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.virtual-host=/user-host
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#超时时间
spring.rabbitmq.connection-timeout=15000
#rabbitmq消费端配置
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
#手动/自动签收 这里配为手动 也可配xxx.simple.prefetch属性 限流
spring.rabbitmq.listener.simple.acknowledge-mode=manual
4、编写消息实体类
注意需要住要implements Serializable接口
且需要加入一个标识消息的属性 比如叫messageId
这里我们简单地用user为例,省略getter、setter方法和构造方法
public class MyUser implements Serializable {
private static final long serialVersionUID = 4405661797632868642L;
private String username;
private String password;
private String messageId;
}
5、编写consumer类
这里为什么我们先选择编写consumer类呢,因为它的监听注解可以直接在控制台生成对应的queue和exchange以及绑定关系,不需要自己手动去控制台新建。
但是注意,之前在properties里写过的virtual-host,还是需要去http://localhost:15672配的,去admin->virtual hosts中新增,不然会报异常。
写完consumer类之后就可以跑了,就可以在控制台中看到生成的queue和exchange了。
对这里的user.*
做一下解释,user.*
可以匹配user.xxx,但是注意后面只能有一个.,如果是user.a.b这种,需要用user.#进行匹配
注意需要加上@Component注解
package com.yogi.example.consumer;
import com.rabbitmq.client.Channel;
import com.yogi.example.entity.MyUser;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class UserReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "user-queue",durable = "true"),
exchange = @Exchange(name = "user-exchange", durable = "true", type = "topic"),
key = "user.*"
))
@RabbitHandler
public void onUserMessage(@Payload MyUser myUser,
@Headers Map<String,Object> headers,
Channel channel) throws IOException {
//消费者操作
System.out.println("---------收到消息,开始消费----------");
System.out.println(myUser.getUsername());
//因为配置文件中设置了手动签收 所以需要以下代码 如果不签收 则rabbitmq会认为你未读消息
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false);
}
}
6、编写producer类
使用RabbitTemplate需要引入amqp,刚刚在依赖的地方已经引入了,可以reload一下
记得exchange和routing key和consumer中对应
package com.yogi.example.producer;
import com.yogi.example.entity.MyUser;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class UserSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendUser(MyUser myUser) throws Exception{
CorrelationData correlationData = new CorrelationData();
correlationData.setId(myUser.getMessageId());
rabbitTemplate.convertAndSend("user-exchange",//exchange
"user.key",//routing key
myUser,//消息体内容
correlationData);//消息唯一ID
}
}
7、其他说明
在这里我把consumer和producer写到了一个工程里,但是前面也说了rabbitMQ是个夸应用的消息代理和队列服务器,所以其实应该写到不同工程里做示例的,但是我这边为了方便就写到了一起。在写在不同的应用时,producer可以不需要监听相关的配置,实体类都是要创建的,server.port可以不同,实际上在一台机器上时也不可以一样,因为会端口冲突。
8、测试
去生成的xxxApplicationTests.java中编写测试用例,这个文件在test下
一般来说messageId可以用业务逻辑拼接,这里就简单地写一下了。
写完后右键run “testSend1()”即可在控制台中看到consumer的输出。
注意如果主程序的application也在跑,输出需要看主程序的console,如果没在跑,就可以直接在testApllication的console中看到输出结果。
package com.yogi.example;
import com.yogi.example.entity.MyUser;
import com.yogi.example.producer.UserSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.UUID;
@SpringBootTest
class ExampleApplicationTests {
@Test
void contextLoads() {
}
@Autowired
private UserSender userSender;
@Test
public void testSend1() throws Exception{
MyUser myUser = new MyUser();
myUser.setUsername("yogi");
myUser.setPassword("000000");
myUser.setMessageId(System.currentTimeMillis()+"$"+ UUID.randomUUID().toString());
userSender.sendUser(myUser);
}
}
更多推荐
所有评论(0)