返回 登录
1

Java中使用AMQ的简单实例

好了,接上一篇,在对JMS的概念以及AMQ的下载安装有了一定认识之后,本篇就来学习一下AMQ在Java中的Hello World级使用(下文我将着重把重点写在注释里面,请注意代码注释部分!!!)。

  1. 开发环境

AMQ 5.14.5 服务启动运行
准备Java Maven项目

依赖jar包在apache-activemq-5.14.5根目录下即可找到,或者直接Maven依赖:

org.apache.activemq
activemq-all
5.14.5

1
2
3
4
5
2. 点对点模型

2.1 队列消息发送者

编写 QueueProducer.java 如下:

package com.jastar.activemq.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 队列消息-发送(生产)者
*


* ClassName: QueueProducer
*



* Copyright: (c)2017 JASTAR·WANG,All rights reserved.
*






/** 默认用户名 */
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** 默认密码 */
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** 默认连接地址(格式如:tcp://IP:61616) */
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
/** 队列名称 */
public static final String QUEUE_NAME = "hello amq";

// 连接工厂(在AMQ中由ActiveMQConnectionFactory实现)
private ConnectionFactory connectionFactory;

// 连接对象
private Connection connection;

// 会话对象
private Session session;

// 消息目的地(对于点对点模型,是Queue对象;对于发布订阅模型,是Topic对象;它们都继承或实现了该接口)
private Destination destination;

// 消息发送(生产)者
private MessageProducer messageProducer;

public static void main(String[] args) {
    QueueProducer producer = new QueueProducer();
    producer.doSend();
}

public void doSend() {
    try {
        /**
         * 1.创建连接工厂<br>
         * 构造函数有多个重载,默认连接本地MQ服务器,也可以手动设置用户名、密码、连接地址信息<br>
         * new ActiveMQConnectionFactory(userName, password, brokerURL)
         */
        connectionFactory = new ActiveMQConnectionFactory();

        /**
         * 2.创建连接
         */
        connection = connectionFactory.createConnection();

        /**
         * 3.启动连接
         */
        connection.start();

        /**
         * 4.创建会话<br>
         * param1:是否支持事务,若为true,则会忽略第二个参数,默认为SESSION_TRANSACTED<br>
         * param2:确认消息模式,若第一个参数为false时,该参数有以下几种状态<br>
         * -Session.AUTO_ACKNOWLEDGE:自动确认。客户端发送和接收消息不需要做额外的工作,即使接收端发生异常,
         * 也会被当作正常发送成功 <br>
         * -Session.CLIENT_ACKNOWLEDGE:客户端确认。客户端接收到消息后,必须调用message.
         * acknowledge() 方法给予收到反馈,JMS服务器才会把该消息当做发送成功,并删除<br>
         * -Session.DUPS_OK_ACKNOWLEDGE:副本确认。一旦接收端应用程序的方法调用从处理消息处返回,
         * 会话对象就会确认消息的接收,而且允许重复确认。
         */
        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        /**
         * 5.创建(发送)消息目的地,即队列,参数为队列名称
         */
        destination = session.createQueue(QUEUE_NAME);

        /**
         * 6.创建一个消息生产者,并指定目的地
         */
        messageProducer = session.createProducer(destination);
        /**
         * 其他操作: 设置生产者的生产模式,默认为持久化<br>
         * 参数有以下两种状态:<br>
         * -DeliveryMode.NON_PERSISTENT:消息不持久化,消息被消费之后或者超时之后将从队列中删除
         * -DeliveryMode.PERSISTENT:消息会持久化,即使接收端消费消息之后仍然会保存
         */
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        /**
         * 其他操作:设置消息的存活时间(单位:毫秒)
         */
        messageProducer.setTimeToLive(60000);

        for (int i = 0; i < 5; i++) {
            /**
             * 7.创建文本消息<br>
             * 此外,还有多种类型的消息如对象,字节……都可以通过session.createXXXMessage()方法创建
             */
            TextMessage message = session.createTextMessage("send content:"
                    + i);

            /**
             * 8. 发送
             */
            messageProducer.send(message);

        }
        System.out.println("消息发送完成!");
        /**
         * 如果有事务操作也可以提交事务
         */
        session.commit();

        /**
         * 9.关闭生产者对象(即使关闭了程序也在运行)
         */
        messageProducer.close();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (connection != null) {
            try {
                /**
                 * 10.关闭连接(将会关闭程序)
                 */
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
2.2 队列消息接收者

编写 QueueConsumer.java 其他同上,注意注释部分:

package com.jastar.activemq.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 队列消息-接收(消费)者
*


* ClassName: QueueConsumer
*



* Copyright: (c)2017 JASTAR·WANG,All rights reserved.
*






private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Destination destination;
// 注意这里是消息接收(消费)者
private MessageConsumer messageConsumer;

public static void main(String[] args) {
    QueueConsumer consumer = new QueueConsumer();
    consumer.doReceive();
}

public void doReceive() {
    try {
        connectionFactory = new ActiveMQConnectionFactory();
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(QueueProducer.QUEUE_NAME);

        /**
         * 注意:这里要创建一个消息消费,并指定目的地(即消息源队列)
         */
        messageConsumer = session.createConsumer(destination);

        // 方式一:监听接收
        receiveByListener();

        // 方式二:阻塞接收
        // receiveByManual();

        /**
         * 注意:这里不能再关闭对象了
         */
        // messageConsumer.close();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        /**
         * 注意:这里不能再关闭Connection了
         */
        // connection.close();
    }

}

/**
 * 通过注册监听器的方式接收消息,属于被动监听
 */
private void receiveByListener() {
    try {
        messageConsumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage msg = (TextMessage) message;
                        System.out.println("Received:“" + msg.getText()
                                + "”");
                        // 可以通过此方法反馈消息已收到
                        msg.acknowledge();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}

/**
 * 通过手动去接收消息的方式,属于主动获取
 */
private void receiveByManual() {
    while (true) {
        try {
            /**
             * 通过receive()方法阻塞接收消息,参数为超时时间(单位:毫秒)
             */
            TextMessage message = (TextMessage) messageConsumer
                    .receive(60000);
            if (message != null) {
                System.out.println("Received:“" + message.getText() + "”");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
3. 发布订阅模型

3.1 主题消息发送者

和点对点模型的发送者唯一不同的是——创建目的地Destination的时候是通过 session.createTopic(); 来创建,其他的使用套路如同 QueueProducer.java ,在此不再贴代码,最后我会留下示例代码地址,需要的可以去down下看看。

3.2 主题消息接收者

编写 TopicConsumer.java ,如下:

package com.jastar.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 主题消息-接收(消费)者
*


* ClassName: TopicConsumer
*



* Copyright: (c)2017 JASTAR·WANG,All rights reserved.
*











public static void main(String[] args) {
    /**
     * Pub/Sub模型中,消息可被多个对象接收,不同于P2P模型
     */
    TopicConsumer consumer1 = new TopicConsumer();
    consumer1.doReceive();
    TopicConsumer consumer2 = new TopicConsumer();
    consumer2.doReceive();
}

public void doReceive() {
    try {
        connectionFactory = new ActiveMQConnectionFactory();
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        destination = session.createTopic(TopicProducer.TOPIC_NAME);
        messageConsumer = session.createConsumer(destination);

        // 方式一:监听接收
        receiveByListener();

        // 方式二:阻塞接收
        // receiveByManual();

    } catch (Exception e) {
        e.printStackTrace();
    }

}

/**
 * 通过注册监听器的方式接收消息,属于被动监听
 */
private void receiveByListener() {
    try {
        messageConsumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage msg = (TextMessage) message;
                        System.out.println("Received:“" + msg.getText()
                                + "”");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}

/**
 * 通过手动去接收消息的方式,属于主动获取
 */
private void receiveByManual() {
    while (true) {
        try {
            /**
             * 通过receive()方法阻塞接收消息,参数为超时时间(单位:毫秒)
             */
            TextMessage message = (TextMessage) messageConsumer
                    .receive(60000);
            if (message != null) {
                System.out.println("Received:“" + message.getText() + "”");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
4.最后说几句

以上代码全部经过我的测试,绝对保证可靠运行
浏览器访问ActiveMQ的Web控制台(http://localhost:8161/admin),即可看到消息、队列、主题等等信息,在此不再上图了,懒死了真是……
示例代码项目地址:https://gitee.com/jastar-wang/demo-amq

评论