常用API

事务

事务采用commit和rollback来提交和回滚事务:

session.commit();
session.rollback();

签收模式

签收代表接收端的session已收到消息的一次确认,反馈给broker,分别为以下三种:

  • Session.AUTO_ACKNOWLEDGE:当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。

  • Session.CLIENT_ACKNOWLEDGE:客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。

  • Session.DUPS_OK_ACKNOWLEDGE:Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。

持久化

默认持久化是开启的,producer端可以在设置持久化:

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

优先级

可以设置信息的优先级,但是必须配置activemq.xml文件,使其支持优先级:

<policyEntry queue="queue1" prioritizedMessages="true" />

然后就可以在代码中设置优先级:

producer.setPriority

消息超时/过期

设置了消息超时的消息,消费端在超时后无法在消费到此消息。

producer.setTimeToLive
死信

此类消息会进入到ActiveMQ.DLQ队列且不会自动清除,称为死信。如果一直不关注这个队列,可能导致OOM

1、修改死信队列名称

<policyEntry queue="test" prioritizedMessages="true" >
  <deadLetterStrategy> 
    <individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" /> 
  </deadLetterStrategy> 
</policyEntry>

useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信

2、让非持久化的消息也进入死信队列

默认情况,对于非持久化的消息,不会进入死信队列,可以通过如下配置让非持久化的消息也进入死信队列

<individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" processNonPersistent="true" /> 

3、设置过期消息不进死信队列

<individualDeadLetterStrategy processExpired="false" /> 

独占消费者

通过在创建队列的时候,指定参数consumer.exclusive为true:

Queue queue = session.createQueue("test?consumer.exclusive=true");

还可以设置优先级

Queue queue = session.createQueue("xxoo?consumer.exclusive=true&consumer.priority=10");

消息类型

object

1、发送端

Person person = new Person("qiqi", 25, 398.0);
Message message = session.createObjectMessage(girl);

2、接受端

if(message instanceof ActiveMQObjectMessage) {
  Person person = (Person)((ActiveMQObjectMessage)message).getObject();
  System.out.println(person);
}

注意,当使用对象时,需要提前添加信任:

connectionFactory.setTrustedPackages(Arrays.asList("com.zjw.entity", "com.zjw.model"));

bytesMessage

1、发送端

BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("Hello".getBytes());
bytesMessage.writeUTF("你好!");

2、接受端

if(message instanceof BytesMessage) {
  BytesMessage bm = (BytesMessage)message;
  
  byte[] b = new byte[1024];
  int len = -1;
  while ((len = bm.readBytes(b)) != -1) {
    System.out.println(new String(b, 0, len));
  }
}

还可以使用ActiveMQ给提供的便捷方法,但要注意读取和写入的顺序

bm.readBoolean()
bm.readUTF()

3、写入文件

FileOutputStream out = new FileOutputStream("/tmp/test.txt");

byte[] bytes = new byte[1024];
int len = 0 ;
while((len = bm.readBytes(bytes))!= -1){
    out.write(bytes,0,len);
}

MapMessage

1、发送端

MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","lucy");
mapMessage.setBoolean("yihun",false);
mapMessage.setInt("age", 17);

producer.send(mapMessage);

2、接收端

Message message = consumer.receive();
MapMessage mes = (MapMessage) message;

System.out.println(mes);
System.out.println(mes.getString("name"));

消息发送原理

同步与异步

-开启事务关闭事务
持久化异步同步
非持久化异步异步

我们可以通过以下几种方式来设置异步发送:

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST);
// 通过factory设置
factory.setUseAsyncSend(true);

ActiveMQConnection conn = (ActiveMQConnection)factory.createConnection();
// 通过连接设置
conn.setUseAsyncSend(true);

消息堆积

producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的确认,才能继续发送。

  • 可以在brokerUrl中设置: tcp://localhost:61616?jms.producerWindowSize=1048576

  • 可以destinationUri中设置: test?producer.windowSize=1048576

延迟消息投递配置

要是用该功能,首先在配置文件中开启延迟和调度项:schedulerSupport="true"

如下:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

延迟发送

消息发送端,对消息进行属性设置:

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000); // 10s

带间隔的重复发送

long delay = 10 * 1000;
long period = 2 * 1000;
int repeat = 9;
// 这里需要注意写入的数据类型,否则不生效
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);

createProducer.send(message);

Cron表达式定时发送

Cron表达式是一个字符串,字符串以5或6个空格隔开,分为6或7个域,每一个域代表一个含义,Cron有如下两种语法格式:

Seconds Minutes Hours DayofMonth Month DayofWeek YearSeconds Minutes Hours DayofMonth Month DayofWeek

每一个域可出现的字符如下:

  • Seconds: 可出现, - * /四个字符,有效范围为0-59的整数

  • Minutes: 可出现, - * /四个字符,有效范围为0-59的整数

  • Hours: 可出现, - * /四个字符,有效范围为0-23的整数

  • DayofMonth: 可出现, - * / ? L W C八个字符,有效范围为0-31的整数

  • Month: 可出现, - * /四个字符,有效范围为1-12的整数或JAN-DEC

  • DayofWeek: 可出现, - * / ? L C #四个字符,有效范围为1-7的整数或SUN-SAT两个范围。1表示星期天,2表示星期一, 依次类推

  • Year: 可出现, - * /四个字符,有效范围为1970-2099

每一个域都使用数字,但还可以出现如下特殊字符,它们的含义是:

  • *:表示匹配该域的任意值,假如在Minutes域使用*, 即表示每分钟都会触发事件。

  • ?:只能用在DayofMonth和DayofWeek两个域。它也匹配域的任意值,但实际不会。因为DayofMonth和 DayofWeek会相互影响。例如想在每月的20日触发调度,不管20日到底是星期几,则只能使用如下写法: 13 13 15 20 * ?, 其中最后一位只能用?,而不能使用*,如果使用*表示不管星期几都会触发,实际上并不是这样。

  • -:表示范围,例如在Minutes域使用5-20,表示从5分到20分钟每分钟触发一次

  • /:表示起始时间开始触发,然后每隔固定时间触发一次,例如在Minutes域使用5/20,则意味着5分钟触发一次,而25,45等分别触发一次.

  • ,:表示列出枚举值值。例如:在Minutes域使用5,20,则意味着在5和20分每分钟触发一次。

  • L:表示最后,只能出现在DayofWeek和DayofMonth域,如果在DayofWeek域使用5L,意味着在最后的一个星期四触发。

  • W:表示有效工作日(周一到周五),只能出现在DayofMonth域,系统将在离指定日期的最近的有效工作日触发事件。例如:在 DayofMonth使用5W,如果5日是星期六,则将在最近的工作日:星期五,即4日触发。如果5日是星期天,则在6日(周一)触发;如果5日在星期一 到星期五中的一天,则就在5日触发。另外一点,W的最近寻找不会跨过月份

  • LW:这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。

  • #:用于确定每个月第几个星期几,只能出现在DayofMonth域。例如在4#2,表示某月的第二个星期三。

举几个例子:

0 0 2 1 * ? * 表示在每月的1日的凌晨2点调度任务

0 15 10 ? * MON-FRI 表示周一到周五每天上午10:15执行作业

0 15 10 ? 6L 2002-2006 表示2002-2006年的每个月的最后一个星期五上午10:15执行作

0 0 10,14,16 * * ? 每天上午10点,下午2点,4点

0 0/30 9-17 * * ? 朝九晚五工作时间内每半小时

0 0 12 ? * WED 表示每个星期三中午12点

0 0 12 * * ? 每天中午12点触发

0 15 10 ? * * 每天上午10:15触发

0 15 10 * * ? 每天上午10:15触发

0 15 10 * * ? * 每天上午10:15触发

0 15 10 * * ? 2005 2005年的每天上午10:15触发

0 * 14 * * ? 在每天下午2点到下午2:59期间的每1分钟触发

0 0/5 14 * * ? 在每天下午2点到下午2:55期间的每5分钟触发

0 0/5 14,18 * * ? 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发

0 0-5 14 * * ? 在每天下午2点到下午2:05期间的每1分钟触发

0 10,44 14 ? 3 WED 每年三月的星期三的下午2:10和2:44触发

0 15 10 ? * MON-FRI 周一至周五的上午10:15触发

0 15 10 15 * ? 每月15日上午10:15触发

0 15 10 L * ? 每月最后一日的上午10:15触发

0 15 10 ? * 6L 每月的最后一个星期五上午10:15触发

0 15 10 ? * 6L 2002-2005 2002年至2005年的每月的最后一个星期五上午10:15触发

0 15 10 ? * 6#3 每月的第三个星期五上午10:15触发

监听器

可以使用监听器来处理消息接收

// 设定消费者的监听器
consumer.setMessageListener(new MessageListener() {
  public void onMessage(Message message) {
  // 当是Map 消息的情况
      if (message instanceof MapMessage) {
      try {
        MapMessage msg = (MapMessage) message;
        //接受消息
        msg.acknowledge();
        String username = msg.getString("username");
        String nickname = msg.getString("nickname");
        int age = msg.getInt("age");
        System.out.printf("获取消费者信息\t%s:%s:%d\r\n", username, nickname, age);
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }
});

当收到消息后会调起onMessage方法

消息过滤

消息发送

Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producter = session.createProducer(null);

MapMessage msg1 = session.createMapMessage();
msg1.setString("username", "张三");
msg1.setInt("age", 12);
msg1.setString("nickname", "zhangsan");
// meta data
msg1.setIntProperty("age", 12);
msg1.setStringProperty("nickname", "zhangsan");

MapMessage msg2 = session.createMapMessage();
msg2.setString("username", "李四");
msg2.setInt("age", 17);
msg2.setString("nickname", "lisi");
msg2.setIntProperty("age", 18);
msg2.setStringProperty("nickname", "lisi");

MapMessage msg3 = session.createMapMessage();
msg3.setString("username", "王五");
msg3.setInt("age", 22);
msg3.setString("nickname", "wangwu");
msg3.setIntProperty("age", 122);
msg3.setStringProperty("nickname", "wangwu");

MapMessage msg4 = session.createMapMessage();
msg4.setString("username", "赵六");
msg4.setInt("age", 24);
msg4.setString("nickname", "zhaoliu");
msg4.setIntProperty("age", 24);
msg4.setStringProperty("nickname", "zhaoliu");

MapMessage msg5 = session.createMapMessage();
msg5.setString("username", "田七");
msg5.setInt("age", 24);
msg5.setString("nickname", "jerry");
msg5.setIntProperty("age", 24);
msg5.setStringProperty("nickname", "jerry");

Destination destination = session.createQueue("test");

// 发送消息
producter.send(destination, msg1, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg2, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg3, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg4, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg5, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);

session.commit();

消息接收

//查询年大于 18的数据, 字符串的过滤,需要有''
MessageConsumer consumer = session.createConsumer(destination, "age >18 OR nickname = 'jerry'");
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐