消息中间件之ActiveMQ — 02
常用API事务事务采用commit和rollback来提交和回滚事务:session.commit();session.rollback();签收模式签收代表接收端的session已收到消息的一次确认,反馈给broker,分别为以下三种:Session.AUTO_ACKNOWLEDGE:当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。Sess
常用API
事务
事务采用commit和rollback来提交和回滚事务:
session.commit();
session.rollback();
签收模式
签收代表接收端的session已收到消息的一次确认,反馈给broker,分别为以下三种:
-
Session.AUTO_ACKNOWLEDGE:当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。
-
Session.CLIENT_ACKNOWLEDGE:客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。
-
Session.DUPS_OK_ACKNOWLEDGE:Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。
持久化
默认持久化是开启的,producer端可以在设置持久化:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
优先级
可以设置信息的优先级,但是必须配置activemq.xml文件,使其支持优先级:
<policyEntry queue="queue1" prioritizedMessages="true" />
然后就可以在代码中设置优先级:
producer.setPriority
消息超时/过期
设置了消息超时的消息,消费端在超时后无法在消费到此消息。
producer.setTimeToLive
死信
此类消息会进入到ActiveMQ.DLQ
队列且不会自动清除,称为死信。如果一直不关注这个队列,可能导致OOM
1、修改死信队列名称
<policyEntry queue="test" prioritizedMessages="true" >
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信
2、让非持久化的消息也进入死信队列
默认情况,对于非持久化的消息,不会进入死信队列,可以通过如下配置让非持久化的消息也进入死信队列
<individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" processNonPersistent="true" />
3、设置过期消息不进死信队列
<individualDeadLetterStrategy processExpired="false" />
独占消费者
通过在创建队列的时候,指定参数consumer.exclusive为true:
Queue queue = session.createQueue("test?consumer.exclusive=true");
还可以设置优先级
Queue queue = session.createQueue("xxoo?consumer.exclusive=true&consumer.priority=10");
消息类型
object
1、发送端
Person person = new Person("qiqi", 25, 398.0);
Message message = session.createObjectMessage(girl);
2、接受端
if(message instanceof ActiveMQObjectMessage) {
Person person = (Person)((ActiveMQObjectMessage)message).getObject();
System.out.println(person);
}
注意,当使用对象时,需要提前添加信任:
connectionFactory.setTrustedPackages(Arrays.asList("com.zjw.entity", "com.zjw.model"));
bytesMessage
1、发送端
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("Hello".getBytes());
bytesMessage.writeUTF("你好!");
2、接受端
if(message instanceof BytesMessage) {
BytesMessage bm = (BytesMessage)message;
byte[] b = new byte[1024];
int len = -1;
while ((len = bm.readBytes(b)) != -1) {
System.out.println(new String(b, 0, len));
}
}
还可以使用ActiveMQ给提供的便捷方法,但要注意读取和写入的顺序
bm.readBoolean()
bm.readUTF()
3、写入文件
FileOutputStream out = new FileOutputStream("/tmp/test.txt");
byte[] bytes = new byte[1024];
int len = 0 ;
while((len = bm.readBytes(bytes))!= -1){
out.write(bytes,0,len);
}
MapMessage
1、发送端
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","lucy");
mapMessage.setBoolean("yihun",false);
mapMessage.setInt("age", 17);
producer.send(mapMessage);
2、接收端
Message message = consumer.receive();
MapMessage mes = (MapMessage) message;
System.out.println(mes);
System.out.println(mes.getString("name"));
消息发送原理
同步与异步
- | 开启事务 | 关闭事务 |
---|---|---|
持久化 | 异步 | 同步 |
非持久化 | 异步 | 异步 |
我们可以通过以下几种方式来设置异步发送:
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST);
// 通过factory设置
factory.setUseAsyncSend(true);
ActiveMQConnection conn = (ActiveMQConnection)factory.createConnection();
// 通过连接设置
conn.setUseAsyncSend(true);
消息堆积
producer
每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize
值时,需要等待broker
的确认,才能继续发送。
-
可以在
brokerUrl
中设置:tcp://localhost:61616?jms.producerWindowSize=1048576
-
可以
destinationUri
中设置:test?producer.windowSize=1048576
延迟消息投递配置
要是用该功能,首先在配置文件中开启延迟和调度项:schedulerSupport="true"
如下:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
延迟发送
消息发送端,对消息进行属性设置:
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000); // 10s
带间隔的重复发送
long delay = 10 * 1000;
long period = 2 * 1000;
int repeat = 9;
// 这里需要注意写入的数据类型,否则不生效
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
createProducer.send(message);
Cron表达式定时发送
Cron表达式是一个字符串,字符串以5或6个空格隔开,分为6或7个域,每一个域代表一个含义,Cron有如下两种语法格式:
Seconds Minutes Hours DayofMonth Month DayofWeek Year
或 Seconds Minutes Hours DayofMonth Month DayofWeek
每一个域可出现的字符如下:
-
Seconds
: 可出现, - * /
四个字符,有效范围为0-59的整数 -
Minutes
: 可出现, - * /
四个字符,有效范围为0-59的整数 -
Hours
: 可出现, - * /
四个字符,有效范围为0-23的整数 -
DayofMonth
: 可出现, - * / ? L W C
八个字符,有效范围为0-31的整数 -
Month
: 可出现, - * /
四个字符,有效范围为1-12
的整数或JAN-DEC
-
DayofWeek
: 可出现, - * / ? L C #
四个字符,有效范围为1-7
的整数或SUN-SAT
两个范围。1
表示星期天,2
表示星期一, 依次类推 -
Year
: 可出现, - * /
四个字符,有效范围为1970-2099
年
每一个域都使用数字,但还可以出现如下特殊字符,它们的含义是:
-
*
:表示匹配该域的任意值,假如在Minutes域使用*, 即表示每分钟都会触发事件。 -
?
:只能用在DayofMonth和DayofWeek两个域。它也匹配域的任意值,但实际不会。因为DayofMonth和 DayofWeek会相互影响。例如想在每月的20日触发调度,不管20日到底是星期几,则只能使用如下写法: 13 13 15 20 * ?, 其中最后一位只能用?,而不能使用*,如果使用*表示不管星期几都会触发,实际上并不是这样。 -
-
:表示范围,例如在Minutes域使用5-20,表示从5分到20分钟每分钟触发一次 -
/
:表示起始时间开始触发,然后每隔固定时间触发一次,例如在Minutes域使用5/20,则意味着5分钟触发一次,而25,45等分别触发一次. -
,
:表示列出枚举值值。例如:在Minutes域使用5,20,则意味着在5和20分每分钟触发一次。 -
L
:表示最后,只能出现在DayofWeek和DayofMonth域,如果在DayofWeek域使用5L,意味着在最后的一个星期四触发。 -
W
:表示有效工作日(周一到周五),只能出现在DayofMonth域,系统将在离指定日期的最近的有效工作日触发事件。例如:在 DayofMonth使用5W,如果5日是星期六,则将在最近的工作日:星期五,即4日触发。如果5日是星期天,则在6日(周一)触发;如果5日在星期一 到星期五中的一天,则就在5日触发。另外一点,W的最近寻找不会跨过月份 -
LW
:这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。 -
#
:用于确定每个月第几个星期几,只能出现在DayofMonth域。例如在4#2,表示某月的第二个星期三。
举几个例子:
0 0 2 1 * ? *
表示在每月的1日的凌晨2点调度任务
0 15 10 ? * MON-FRI
表示周一到周五每天上午10:15执行作业
0 15 10 ? 6L 2002-2006
表示2002-2006年的每个月的最后一个星期五上午10:15执行作
0 0 10,14,16 * * ?
每天上午10点,下午2点,4点
0 0/30 9-17 * * ?
朝九晚五工作时间内每半小时
0 0 12 ? * WED
表示每个星期三中午12点
0 0 12 * * ?
每天中午12点触发
0 15 10 ? * *
每天上午10:15触发
0 15 10 * * ?
每天上午10:15触发
0 15 10 * * ? *
每天上午10:15触发
0 15 10 * * ? 2005
2005年的每天上午10:15触发
0 * 14 * * ?
在每天下午2点到下午2:59期间的每1分钟触发
0 0/5 14 * * ?
在每天下午2点到下午2:55期间的每5分钟触发
0 0/5 14,18 * * ?
在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
0 0-5 14 * * ?
在每天下午2点到下午2:05期间的每1分钟触发
0 10,44 14 ? 3 WED
每年三月的星期三的下午2:10和2:44触发
0 15 10 ? * MON-FRI
周一至周五的上午10:15触发
0 15 10 15 * ?
每月15日上午10:15触发
0 15 10 L * ?
每月最后一日的上午10:15触发
0 15 10 ? * 6L
每月的最后一个星期五上午10:15触发
0 15 10 ? * 6L 2002-2005
2002年至2005年的每月的最后一个星期五上午10:15触发
0 15 10 ? * 6#3
每月的第三个星期五上午10:15触发
监听器
可以使用监听器来处理消息接收
// 设定消费者的监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// 当是Map 消息的情况
if (message instanceof MapMessage) {
try {
MapMessage msg = (MapMessage) message;
//接受消息
msg.acknowledge();
String username = msg.getString("username");
String nickname = msg.getString("nickname");
int age = msg.getInt("age");
System.out.printf("获取消费者信息\t%s:%s:%d\r\n", username, nickname, age);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
当收到消息后会调起onMessage方法
消息过滤
消息发送
Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producter = session.createProducer(null);
MapMessage msg1 = session.createMapMessage();
msg1.setString("username", "张三");
msg1.setInt("age", 12);
msg1.setString("nickname", "zhangsan");
// meta data
msg1.setIntProperty("age", 12);
msg1.setStringProperty("nickname", "zhangsan");
MapMessage msg2 = session.createMapMessage();
msg2.setString("username", "李四");
msg2.setInt("age", 17);
msg2.setString("nickname", "lisi");
msg2.setIntProperty("age", 18);
msg2.setStringProperty("nickname", "lisi");
MapMessage msg3 = session.createMapMessage();
msg3.setString("username", "王五");
msg3.setInt("age", 22);
msg3.setString("nickname", "wangwu");
msg3.setIntProperty("age", 122);
msg3.setStringProperty("nickname", "wangwu");
MapMessage msg4 = session.createMapMessage();
msg4.setString("username", "赵六");
msg4.setInt("age", 24);
msg4.setString("nickname", "zhaoliu");
msg4.setIntProperty("age", 24);
msg4.setStringProperty("nickname", "zhaoliu");
MapMessage msg5 = session.createMapMessage();
msg5.setString("username", "田七");
msg5.setInt("age", 24);
msg5.setString("nickname", "jerry");
msg5.setIntProperty("age", 24);
msg5.setStringProperty("nickname", "jerry");
Destination destination = session.createQueue("test");
// 发送消息
producter.send(destination, msg1, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg2, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg3, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg4, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg5, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
session.commit();
消息接收
//查询年大于 18的数据, 字符串的过滤,需要有''
MessageConsumer consumer = session.createConsumer(destination, "age >18 OR nickname = 'jerry'");
更多推荐
所有评论(0)