写在前面

本文根据慕课视频RabbitMQ消息中间件极速入门与实战整理而来,有兴趣的可以去看一下这个视频,可能会比只看文章更清晰一点,写下这篇文章只是因为个人更喜欢看文字版的东西,也供有相同需求的人参考。

一、简介

1、什么是RabbitMQ?

RabbitMQ是一个开源的消息代理和队列服务器,通过普通协议在完全不同的应用之间共享数据。RabbitMQ使用Erlang语言编写,并基于AMQP协议。

Erlang语言 数据传输延迟低(利于承载高并发) socket也一样
RabbitMQ可以与SpringAMQP完美整合,SpringAMQP框架提供了原生的rabbitMQ api 也提供了丰富的拓展API

AMQP:高级消息队列协议 是一套规范

2、RabbitMQ相关概念解释和联系

①producer和consumer

消息的生产者和消费者

②virtual host、exchange和message queue

virtual host 虚拟地址,用于进行逻辑隔离,最上层的消息路由。
一个virtual host里面可以有若干个exchange和queue,同一个virtual host里面不能有相同名称的exchange和queue

exchange 交换机 接收消息,根据路由键转发消息到绑定的队列

queue 也称为message queue 消息队列 保存消息并将它们转发给消费者
MQ: message queue

③binding和routing key

binding 绑定 exchange和queue之间的虚拟连接,binding中可以包含routing key

routing key 一个路由规则,虚拟机可以用它来确定如何路由一个特定消息

④消息流转

virtual host中的exchange交换机和message queue有绑定关系,通过路由key(routine key)进行关联。
生产者producer将消息投递到server上,指定对应的exchange和路由key。经过virtual host(虚拟主机)和exchange(交换机)后消息被分发到对应的mq。
消费者只需要监听message queue即可取到消息。

⑤其他概念

server 又称为broker 接受客户端的连接,实现AMQP实体服务
connection 连接 应用程序与broker的网络连接
channel 网络信道 进行消息读写的通道 几乎所有的操作都在channel中进行
客户端可建立多个channel 每个channel代表一个会话任务

message 消息 服务器和应用程序之间传送的数据 由properties和body组成
properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性
body就是消息体内容

二、安装

这个已经有很多博客写了,在这里就不赘述,可参考:
Windows下安装RabbitMQ

简述一下就是安装ERLANG,并配置环境变量;安装rabbitmq,并配置环境变量。

但是在这里我也踩了一些坑,尽管和博主写的输入rabbitmqctl status后的结果一样,但是我并不需要做他的任何操作。在配置完两个环境变量后,只需要直接切换路径并输入rabbitmq-server.bat,即可启动。
不在cmd中操作也是完全没问题的,到sbin目录下双击rabbitmq-server.bat文件即可打开http://localhost:15672。

另:换盘符切换路径时,需要加上/d,例如:cd /d D:\rabbitmq-server\rabbitmq_server-3.7.7

三、在SpringBoot项目中简单使用

1、新建Spring项目

并配置server.port等,略。

2、在pom文件中引入相关依赖

引入amqp相关依赖即可

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3、新增application.properties文件相关配置

rabbitmq的端口默认是5672,management是15672

#rabbitmq基本配置
spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.virtual-host=/user-host
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#超时时间
spring.rabbitmq.connection-timeout=15000

#rabbitmq消费端配置
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
#手动/自动签收 这里配为手动 也可配xxx.simple.prefetch属性 限流
spring.rabbitmq.listener.simple.acknowledge-mode=manual

4、编写消息实体类

注意需要住要implements Serializable接口
且需要加入一个标识消息的属性 比如叫messageId

这里我们简单地用user为例,省略getter、setter方法和构造方法

public class MyUser implements Serializable {

    private static final long serialVersionUID = 4405661797632868642L;

    private String username;
    private String password;
    private String messageId;
}

5、编写consumer类

这里为什么我们先选择编写consumer类呢,因为它的监听注解可以直接在控制台生成对应的queue和exchange以及绑定关系,不需要自己手动去控制台新建。
但是注意,之前在properties里写过的virtual-host,还是需要去http://localhost:15672配的,去admin->virtual hosts中新增,不然会报异常。
写完consumer类之后就可以跑了,就可以在控制台中看到生成的queue和exchange了。

对这里的user.*做一下解释,user.*可以匹配user.xxx,但是注意后面只能有一个.,如果是user.a.b这种,需要用user.#进行匹配

注意需要加上@Component注解

package com.yogi.example.consumer;

import com.rabbitmq.client.Channel;
import com.yogi.example.entity.MyUser;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class UserReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "user-queue",durable = "true"),
            exchange = @Exchange(name = "user-exchange", durable = "true", type = "topic"),
            key = "user.*"
    ))
    @RabbitHandler
    public void onUserMessage(@Payload MyUser myUser,
                              @Headers Map<String,Object> headers,
                              Channel channel) throws IOException {

        //消费者操作
        System.out.println("---------收到消息,开始消费----------");
        System.out.println(myUser.getUsername());

        //因为配置文件中设置了手动签收 所以需要以下代码 如果不签收 则rabbitmq会认为你未读消息
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag,false);

    }
}

6、编写producer类

使用RabbitTemplate需要引入amqp,刚刚在依赖的地方已经引入了,可以reload一下
记得exchange和routing key和consumer中对应

package com.yogi.example.producer;

import com.yogi.example.entity.MyUser;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class UserSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendUser(MyUser myUser) throws Exception{

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(myUser.getMessageId());
        rabbitTemplate.convertAndSend("user-exchange",//exchange
                "user.key",//routing key
                myUser,//消息体内容
                correlationData);//消息唯一ID
    }

}

7、其他说明

在这里我把consumer和producer写到了一个工程里,但是前面也说了rabbitMQ是个夸应用的消息代理和队列服务器,所以其实应该写到不同工程里做示例的,但是我这边为了方便就写到了一起。在写在不同的应用时,producer可以不需要监听相关的配置,实体类都是要创建的,server.port可以不同,实际上在一台机器上时也不可以一样,因为会端口冲突。

8、测试

去生成的xxxApplicationTests.java中编写测试用例,这个文件在test下
一般来说messageId可以用业务逻辑拼接,这里就简单地写一下了。

写完后右键run “testSend1()”即可在控制台中看到consumer的输出。
注意如果主程序的application也在跑,输出需要看主程序的console,如果没在跑,就可以直接在testApllication的console中看到输出结果。

package com.yogi.example;

import com.yogi.example.entity.MyUser;
import com.yogi.example.producer.UserSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
class ExampleApplicationTests {

    @Test
    void contextLoads() {
    }

    @Autowired
    private UserSender userSender;

    @Test
    public void testSend1() throws Exception{
        MyUser myUser = new MyUser();
        myUser.setUsername("yogi");
        myUser.setPassword("000000");
        myUser.setMessageId(System.currentTimeMillis()+"$"+ UUID.randomUUID().toString());
        userSender.sendUser(myUser);
    }
}

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐