ActiveMQ-通过javax.jms调用
环境操作系统:Win10IDE:EclipseJDK:1.8构建工具:MavenActiveMQ服务IP及端口:192.168.142.154:61616创建工程创建一个Maven工程,工程类型为jar,工程名为jmsDemo队列模式(Queue)消息发布方消息消费方订阅模式(Topic)消息发布方消息消费方...
·
环境
操作系统:Win10
IDE:Eclipse
JDK:1.8
构建工具:Maven
ActiveMQ服务IP及端口:192.168.142.154:61616
创建工程 jmsDemo
-
创建一个Maven工程,工程类型为jar,工程名为 jmsDemo
-
引入相关依赖
pom.xml<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>javax</groupId> <artifactId>j2ee</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.7</version> </dependency>
队列模式(Queue)-消息消费方
- 创建java类:QueueConsumer
package message;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class QueueConsumer {
public static void main(String[] args) {
try {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.142.154:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(queue);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
- 执行main方法
- 查看ActiveMQ控制台:http://192.168.142.154:8161,确实新增加了一个消息消费方
队列模式(Queue)-消息生产方
- 新建一个java类
package message;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class QueueProducer {
public static void main(String[] args) {
try {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.142.154:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
TextMessage textMessage = session.createTextMessage("Hello-队列测试消息");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
- 执行main方法
- 查看之前启动的消息消费方QueueConsumer的控制台日志:
消息消费方已经接收到了消息
订阅模式(Topic)-消息消费方
-
创建java类TopicConsumer:
package message; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicProducer { public static void main(String[] args) { try { //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.142.154:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息生产者 MessageProducer producer = session.createProducer(topic); //7.创建消息 TextMessage textMessage = session.createTextMessage("Hello-订阅测试消息"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
-
执行main方法
-
查看ActiveMQ控制台,增加了一个消息消费方
订阅模式(Topic)-消息发布方
- 创建java类TopicProducer:
package message;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicProducer {
public static void main(String[] args) {
try {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.142.154:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage textMessage = session.createTextMessage("Hello-订阅测试消息");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
- 执行main方法
- 查看之前启动的消息消费方TopicConsumer的控制台日志:
消息消费方已经成功接收到了消息
更多推荐
已为社区贡献2条内容
所有评论(0)