学会RabbitMQ这一篇就够了
文章目录基础篇简单代码-生存者消费者Work Queue 工作队列Pub/Sub 订阅模式Fanout 广播Direct 路由方式Topic 通配符SpringBoot整合Demo高级篇1、消息的可靠性投递confirmCallback 确认发布ReturnCallback 回退模式Ack 消费者确认模式MessageRecoverer捕捉异常死信队列基础篇简单代码-生存者消费者Maven<
文章目录
基础命令
/sbin/service rabbitmq-server status 查看状态
/sbin/service rabbitmq-server start 启动
/sbin/service rabbitmq-server restart 重启
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins 查看插件位置
rabbitmq-plugins enable rabbitmq_delayed_message_exchange 开启插件
要使用rabbitmqctl命令,需要映射好 /etc/hosts 中主机名
rabbitmqctl reset 重置用户
rabbitmqctl add_user admin 123 添加用户
rabbitmqctl set_user_tags admin administrator 设置用户角色
rabbitmqctl set_permissions -p “/” admin “." ".” “.*” 设置用户权限
rabbitmqctl list_users 查看所有用户
基础篇
简单代码-生存者消费者
Maven
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
生成者
package com.dmbjz.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/*HelloWord模型 生产者代码*/
public class Producer {
public static final String QUEUE_NAME = "hello"; // 队列名称
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("120.77.39."); // 设置MQ所在机器IP进行连接
factory.setPort(5672); // 指定MQ服务端口 默认也是5672
//factory.setVirtualHost("-"); // 指定使用的VirtualHost
factory.setUsername("admin"); // 指定MQ账号名
factory.setPassword("123"); // 指定MQ密码
Connection connection = factory.newConnection(); // 创建连接
Channel channel = connection.createChannel(); // 创建信道
/* 队列设置(创建队列)
*参数1:队列名称,名称不存在就自动创建
*参数2:定义队列是否持久化(重启MQ后是队列否存在),true开启,false关闭
*参数3:exclusive 是否独占队列(设置是否只能有一个消费者使用),true独占,false非独占
*参数4:autoelete 是否在消费完成后是否自动删除队列 ,true删除,false不删除
*参数5:额外附加参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello RabbitMQ"; // 需要发送的消息
/* 交换机&队列设置(指定消息使用的交换机和队列)
* 参数1: exchange交换机名称(简单队列无交换机,这里不写)
* 参数2: 有交换机就是路由key。没有交换机就是队列名称,意为往该队列里存放消息
* 参数3: 传递消息的额外设置 (设置消息是否持久化) MessageProperties.PERSISTENT_TEXT_PLAIN设置消息持久化
* 参数4: 消息具体内容(要为 Byte类型)
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
/*关闭资源*/
channel.close();
connection.close();
System.out.println("消息生产完毕");
}
}
消费者
package com.dmbjz.one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*HelloWord模型 消费者案例*/
public class Consumer {
public static final String QUEUE_NAME = "hello"; // 队列名称
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("120.77.39."); // 设置MQ所在机器IP进行连接
factory.setPort(5672); // 指定MQ服务端口
//factory.setVirtualHost("study"); // 指定使用的VirtualHost
factory.setUsername("admin"); // 指定MQ账号名
factory.setPassword("123"); // 指定MQ密码
Connection connection = factory.newConnection(); // 创建连接
Channel channel = connection.createChannel(); // 创建信道
/*消费者成功消费时的回调接口,这里为打印获取到的消息*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
/*消费者取消消费的回调*/
CancelCallback callback = consumerTag -> {
System.out.println("消息者取消消费接口回调逻辑");
};
/* 消费消息
* 参数1 : 消费队列的名称
* 参数2 : 消息的自动确认机制(一获得消息就通知 MQ 消息已被消费) true打开,false关闭 (接收到消息并消费后也不通知 MQ ,常用)
* 参数3 : 消费者成功消费时的回调接口
* 参数4 : 消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, callback);
System.out.println("消费者执行完毕");
}
}
Work Queue 工作队列
默认工作模式,多个消费者是竞争关系,顺序执行
Pub/Sub 订阅模式
1、引入了交换机概念,对应三种类型:Fanout 广播、Direct 定向、Topic 主题
Fanout :群发,不考虑routing key ,群发到所有绑定改交换机的队列
Direct:定向,需要指明routing key,发送到指定队列
Topic:通配符,符合对应规则的routing key ,对应队列都会接收到消息
连接工具类
package com.suqi.rabbitmq.controller.practice.Utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitUtils {
private static ConnectionFactory connectionFactory; //放到静态代码块中,在类加载时执行,只执行一次。达到工厂只创建一次,每次获取是新连接的效果
static {
//创建连接工厂
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.77.--"); //设置MQ的主机地址
connectionFactory.setPort(5672); //设置MQ服务端口
// connectionFactory.setVirtualHost("study"); //设置Virtual Hosts(虚拟主机)
connectionFactory.setUsername("-"); //设置MQ管理人的用户名(要在Web版先配置,保证该用户可以管理设置的虚拟主机)
connectionFactory.setPassword("-"); //设置MQ管理人的密码
}
//定义提供连接对象的方法,封装
public static Connection getConnection() {
try {
//创建连接对象并返回
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//关闭通道和关闭连接工具类的方法
public static void closeConnectionAndChanle(Channel channel, Connection connection) {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Fanout 广播
生成者:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.suqi.rabbitmq.controller.practice.Utils.RabbitUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
// 广播
public class Producer {
public static final String QUEUE_NAME_1 = "fanout_1"; // 队列名称
public static final String QUEUE_NAME_2 = "fanout_2"; // 队列名称
public static final String EXCHANGE_NAME = "fanout_exchange"; // 队列名称
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机
/**
* (String exchange,
* BuiltinExchangeType type,
* boolean durable,
* boolean autoDelete,
* boolean internal,
* Map<String, Object> arguments)
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true,false,false,null);
channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
// 绑定
channel.queueBind(QUEUE_NAME_1,EXCHANGE_NAME,"" );
channel.queueBind(QUEUE_NAME_2,EXCHANGE_NAME,"" );
String message = "Fanout RabbitMQ"; // 需要发送的消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
/*关闭资源*/
RabbitUtils.closeConnectionAndChanle(channel,connection);
System.out.println("消息生产完毕");
}
}
消费者
/*HelloWord模型 消费者案例*/
public class Consumer_1 {
public static final String QUEUE_NAME_1 = "fanout_1"; // 队列名称
public static final String EXCHANGE_NAME = "fanout_exchange"; // 队列名称
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
/*消费者成功消费时的回调接口,这里为打印获取到的消息*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
/*消费者取消消费的回调*/
CancelCallback callback = consumerTag -> {
System.out.println("消息者取消消费接口回调逻辑");
};
/* 消费消息
* 参数1 : 消费队列的名称
* 参数2 : 消息的自动确认机制(一获得消息就通知 MQ 消息已被消费) true打开,false关闭 (接收到消息并消费后也不通知 MQ ,常用)
* 参数3 : 消费者成功消费时的回调接口
* 参数4 : 消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME_1, true, deliverCallback, callback);
System.out.println("消费者执行完毕");
}
}
Direct 路由方式
直接路由模式:
通过routingKey 绑定好 交换机和队列的关系,消息发送时也需要指定发送到哪个交换机的哪个routingKey上。
// 直接定向
public class Producer {
public static final String QUEUE_NAME_1 = "Direct_1"; // 队列名称
public static final String QUEUE_NAME_2 = "Direct_2"; // 队列名称
public static final String ROUTING_NAME_1 = "routing_key_1"; // 队列名称
public static final String ROUTING_NAME_2 = "routing_key_2"; // 队列名称
public static final String EXCHANGE_NAME = "routing_exchange"; // 队列名称
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true,false,false,null);
channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
// 绑定
channel.queueBind(QUEUE_NAME_1,EXCHANGE_NAME,ROUTING_NAME_1 );
channel.queueBind(QUEUE_NAME_2,EXCHANGE_NAME,ROUTING_NAME_2 );
String message1 = "Direct RabbitMQ routing_key_1 发送到队列routing_1 "; // 需要发送的消息
String message2 = "Direct RabbitMQ routing_key_2 发送到队列routing_2 "; // 需要发送的消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_NAME_1, null, message1.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, ROUTING_NAME_2, null, message2.getBytes(StandardCharsets.UTF_8));
/*关闭资源*/
RabbitUtils.closeConnectionAndChanle(channel,connection);
System.out.println("消息生产完毕");
}
}
Topic 通配符
相当于Direct模式添加了模糊匹配,可以让消费者在队列绑定时对 Routingkey 使用通配符,Routingkey 一般都是由一个或多个单词组成,多个单词之间以"."分割
如果一个消息满足同一队列的多个绑定,只会被消费一次
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点类似 fanout 了;如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
匹配规则
# 统配符
* (star) can substitute for exactly one word. 匹配不多不少恰好1个词
# (hash) can substitute for zero or more words. 匹配一个或多个词
# 如:
orange.# 匹配 orange 开头的词
audit.* 只能匹配 audit 后面只有一个词语的
#.audit.# 匹配中间夹杂audit的词语
生产者
package com.suqi.rabbitmq.controller.practice.PubSub.Topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.suqi.rabbitmq.controller.practice.Utils.RabbitUtils;
import java.nio.charset.StandardCharsets;
/*Topic模式 生产者案例*/
public class Provider {
private static final String EXCHANGE_NAME = "TopicExchange";
private static final String QUEUE_NAME_1 = "Topic_1"; // 队列名称
private static final String QUEUE_NAME_2 = "Topic_2"; // 队列名称
private static final String ORANGE_KEY= "*.order";
private static final String RABBIT_KEY = "baidu.#";
private static final String LAZY_KEY = "lazy.user.admin";
public static void main(String[] args) throws Exception {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
/*声明交换机
* 参数一:交换机名称
* 参数二:交换机类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
// 绑定
channel.queueBind(QUEUE_NAME_1,EXCHANGE_NAME,ORANGE_KEY );
channel.queueBind(QUEUE_NAME_2,EXCHANGE_NAME,RABBIT_KEY );
channel.queueBind(QUEUE_NAME_2,EXCHANGE_NAME,LAZY_KEY );
/*模拟不同会员等级接收到的不同消息内容*/
for (int i = 0; i < 9; i++) {
String message = "当前是待发送的消息,序号:"+i;
if(i%3==0){
channel.basicPublish(EXCHANGE_NAME,"abc.order",null,message.getBytes(StandardCharsets.UTF_8)); //发送ORANGE消息
}
if(i%3==1){
channel.basicPublish(EXCHANGE_NAME,"baidu.sad.q",null,message.getBytes(StandardCharsets.UTF_8)); //发送RABBIT消息
}
if(i%3==2){
channel.basicPublish(EXCHANGE_NAME,"lazy.user.admin",null,message.getBytes(StandardCharsets.UTF_8)); //发送LAZY消息
}
System.out.println("消息发送: " + message);
}
}
}
SpringBoot整合
Demo
核心注解:@RabbitListener、RabbitTemplate
依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency>
配置文件application.yml
spring: rabbitmq: host: 120.77.*.* port: 5672 username: * password: *
配置config ,定义交换机、队列、绑定关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "springboot_Direct"; // 队列名称
public static final String ROUTING_KEY = "springboot_routing_key"; // 队列名称
public static final String EXCHANGE_NAME = "springboot_direct_exchange"; // 队列名称
//交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
}
// 队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
// 绑定
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
}
}
生产者发送消息
import com.suqi.rabbitmq.conf.RabbitMQ.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProviderTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessage(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.ROUTING_KEY,"今天很开心呀!");
}
}
消费者监听
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerController {
@RabbitListener(queues = "springboot_Direct")
public void getRabbitMessage(Message message){
System.out.println(new String(message.getBody()));
}
}
高级篇
1、消息的可靠性投递
从消息的发送方入手, RabbitMQ为完美提供了两种方式用来控制消息的投递可靠性模式
- Confirm 确认模式
- return 回退模式
Rabbit整个消息投递过程:
producer— rabbitmq broker— exchange— queue— consumer
- 消息从producer到exchange 则会返回一个confirmCallback。
- 消息从exchange到queue 投递失败会返回一个 returnCallbac。
利用这个回调控制消息的可靠性投递
confirmCallback 确认发布
1、编写开启确认发布
配置文件yml开始确认发布rabbitmq: publisher-confirm-type: correlated
2、在rabbitTemplate中编写confirmCallback 回调函数
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProviderConfirm {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessage(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 相关配置信息
* @param ack exchange交换机是否接收到消息 true 成功, false失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("******confirm回调函数被执行");
if (ack){
System.out.println("******成功接收到消息");
} else {
System.out.println("*******接收消息失败" + cause);
}
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.ROUTING_KEY,"今天很开心呀!");
}
}
ReturnCallback 回退模式
1、编写开启确认发布
配置文件yml开始确认发布rabbitmq: publisher-returns: true template mandatory: true
2、在rabbitTemplate中编写ReturnCallback 回调函数
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProviderReturn {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessage(){
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 回退模式:当消息发送给exchange后,如果路由routingKey匹配不到,才会执行
* 步骤:1、开启回退
* 2、设置回调函数
* 3、设置exchange处理消息的模式
* 1、丢弃(默认)
* 2、执行回调函数
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("****执行return方法");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.ROUTING_KEY+"111","今天很开心呀!");
}
}
Ack 消费者确认模式
确保消息正常消费,不丢失在消费者端可以有两种解决方法
- 1、捕捉异常,手动确认,失败重试basicNack方法的 requeue 设置为trun
- 2、利用spring管理客户端的错误重试,加配置MessageRecoverer,然消息达到错误次数后发送到错误队列中
- 3、利用死信队列,将取消确认的消息,放到死信队列中 (第二种是spring管理,这种是rabbitmq管理)
MessageRecoverer
1、利用spring管理的失败重试,达到次数后,利用MessageRecoverer 发送到错误队列,进行人工补偿
- 配置文件yml 失败重试次数、间隔时间等,注意确认方式建议使用auto,
因为自动提交了,所以basicNack可以不写,消息也能在队列中消失,并发送到错误队列中去等待消费# ----------- 这其实spring 管理的消费端,没有重新发回队列 rabbitmq: listener: simple: retry: enabled: true max-attempts: 3 max-interval: 10000 acknowledge-mode: auto
- 编写 MessageRecoverer Bean,配置好错误队列的交换机和路由
/**
* 以上已经可以解决正常消息的消费, 下面是为了引入确认进行的,就是消息失败后达到错误次数后,不再进行重复,改将消息发送到错误队列中
* 注意:spring.rabbitmq.listener.retry配置的重发是在消费端应用内处理的,不是rabbitqq重发
*/
public static final String QUEUE_NAME_ERROR = "queuemsxferror"; // 错误队列名称
public static final String ROUTING_KEY_ERROR = "routingkeymsxferror"; //
public static final String EXCHANGE_NAME_ERROR = "exchangemsxferror"; //
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "exchangemsxferror", "routingkeymsxferror");
}
//交换机
@Bean("exchangemsxferror")
public Exchange exchangemsxferror(){
return ExchangeBuilder.directExchange(EXCHANGE_NAME_ERROR).durable(true).build();
}
// 队列
@Bean("queuemsxferror")
public Queue queuemsxferror(){
return QueueBuilder.durable(QUEUE_NAME_ERROR).build();
}
// 绑定
@Bean
public Binding bindQueueExchangeMaxfError(@Qualifier("queuemsxferror") Queue queue,@Qualifier("exchangemsxferror") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ERROR).noargs();
}
捕捉异常
2、捕捉异常,手动确认,失败重新入队尾 (该方式可能导致短时间内一直重复失败)
@Component
public class ConsumerACK {
@RabbitListener(queues = "springboot_Direct")
public void getRabbitMessage(Message message , Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
Thread.sleep(1000);
System.out.println(new String(message.getBody()));
int a=1/0;
channel.basicAck(deliveryTag,false);
} catch (Exception e){
/**
* multiple: 是否批量, true 批量 ,false 不批量
* requeue : 拒绝后是否重写入队,true 重写入队, false 丢弃
*/
channel.basicNack(deliveryTag,false,true);
System.out.println("------------------");
throw e;
}
}
}
死信队列
由RabbitMQ自身管理,实现消息转发,也可以衍生出延时队列,原理是设置TTL过去时间(MQ只会检查第一个是否过期,因此要引入插件),达到延时处理的目的
达成条件:
- 消息TTL过期
- 队列达到最大长度
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
// 队列
@Bean("bootQueue")
public Queue bootQueue(){
// 配置死信队列
Map<String,Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange",EXCHANGE_NAM_DIE);
args.put("x-dead-letter-routing-key",ROUTING_KEY_DIE);
//声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
}
/**
* 通过死信队列的方式保证消费者不会丢失消息
*/
public static final String QUEUE_NAME_DIE = "queuedie"; // 错误队列名称
public static final String ROUTING_KEY_DIE = "routingkeydie"; //
public static final String EXCHANGE_NAM_DIE = "exchangedie"; //
//交换机
@Bean("exchangeDie")
public Exchange exchangeDie(){
return ExchangeBuilder.directExchange(EXCHANGE_NAM_DIE).durable(true).build();
}
// 队列
@Bean("queueDie")
public Queue queueDie(){
return QueueBuilder.durable(QUEUE_NAME_DIE).build();
}
// 绑定
@Bean
public Binding bindQueueExchangeDie(@Qualifier("queueDie") Queue queue,@Qualifier("exchangeDie") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_DIE).noargs();
}
2、消费端限流
1、通过配置手动确认方式,修改每次接收的预取值,达到限流的目的,以下例子表示一次拉去4条消息
# ----------- 这其实spring 管理的消费端,没有重新发回队列 rabbitmq: listener: simple: acknowledge-mode: manual prefetch: 4
3、TTL
设置消息过期时间有两种方式,优先以过期时间短的为准
消息过期后,只有消息在队列顶端,才会判断其是否过期(移除)
1、队列过期时间
2、单词发送消息过期时间
过期后的消息一般会自动删除, 或者转到死信队列种
@Bean("bootQueue")
public Queue bootQueue(){
// // 配置死信队列
Map<String,Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange",EXCHANGE_NAM_DIE);
args.put("x-dead-letter-routing-key",ROUTING_KEY_DIE);
//声明队列的 TTL
args.put("x-message-ttl", 50000);
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
}
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProviderTTL {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessage(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.ROUTING_KEY,"今天很开心呀!",message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});
}
}
4、延迟队列(插件)
延迟队列的实现有两种方式:
1、TTL+死信队列的方式
2、延迟队列插件 (rabbitmq_delayed_message_exchange-3.8.0)
注意:RabbitMQ 只会检查第一个消息是否过期,基于死信队列的发送方式存在顺序问题,在设置单个消息过期时间时,存在过期时间失效延后问题
以下重点讲述 延迟插件的方式 (基于交换机作为控制方)
1、负责插件到指定文件加
cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/
2、开启插件,按重启RabbitMQ ,
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
1、配置好延迟交换机 x-delayed-message
2、设置好消息的延迟时间 message.getMessageProperties().setDelay(2000);
/**
* 基于插件的放方式,实现延迟消息,控制方在交换器
*/
public static final String DELAYED_QUEUE_NAME = "delayed_Direct"; // 队列名称
public static final String DELAYED_ROUTING_KEY = "delayed_routing_key"; // 队列名称
public static final String DELAYED_EXCHANGE_NAME = "delayed_direct_exchange"; // 队列名称
//交换机
@Bean("delayedExchange")
public CustomExchange delayedExchange(){
// 是一个延迟类型的交换机,且是直接类型作为routingKey
Map<String,Object> args = new HashMap<>(3);
args.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,args);
}
// 队列
@Bean("delayedQueue")
public Queue delayedQueue(){
return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
}
// 绑定
@Bean
public Binding bindDelayedQueueExchange(@Qualifier("delayedQueue") Queue queue,@Qualifier("delayedExchange") CustomExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();
}
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProviderDelayed {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessage(){
// 基于插件的延迟消息, 配置好延迟交换机,设置消息延迟时间
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME,RabbitMQConfig.DELAYED_ROUTING_KEY,"今天很开心呀5000!",message -> {
// 设置好延迟发送时间
message.getMessageProperties().setDelay(5000);
return message;
});
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME,RabbitMQConfig.DELAYED_ROUTING_KEY,"今天很开心呀2000!",message -> {
// 设置好延迟发送时间
message.getMessageProperties().setDelay(2000);
return message;
});
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME,RabbitMQConfig.DELAYED_ROUTING_KEY,"今天很开心呀10000!",message -> {
// 设置好延迟发送时间
message.getMessageProperties().setDelay(10000);
return message;
});
}
结果:解决了利用死信队列bug,即延迟时间短的因后发送而后出队被消费问题。
日志 集群这里不进行追溯
更多推荐
所有评论(0)