问题:我的ActiveMQ接收消息用的是topic模式,持久化订阅,问题是我用了JMS接收消息的代码每次重新启动总是会收到最后一次的消息,但这些消息是已经接收过了的,而且启动一次就收到一次,难道ActiveMQ不会清除缓存的吗?
- //创建JMS连接和会话
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
- connection = factory.createConnection();
- connection.setClientID(Constant.JMS_CLIENT_ID);
- session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- // 创建消息发送主题和发送者
- Topic jmsSendTopic = session.createTopic(sendTopic);
- sendTopicProducer = session.createProducer(jmsSendTopic);
- sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
- // 创建消息接收主题和接收者
- Topic jmsReceiveTopic = session.createTopic(receiveTopic);
- receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);
- receiveTopicConsumer.setMessageListener(this);
- connection.start();
解答:问题原因在于这段代码在接收到JMS消息时不会向ActiveMQ服务器确认消息的接收,故而ActiveMQ服务器一直认为该消息没有成功发送给接收者,因而每次接收者重启之后就会收到ActiveMQ服务器发送过来的消息。在这里要解释一下session的创建。
- session = connection.createSession(true,Session.Auto_ACKNOWLEDGE);
当createSession第一个参数为true时,表示创建的session被标记为transactional的,确认消息就通过确认和校正来自动地处理,第二个参数应该是没用的。
- session = connection.createSession(false,Session.Auto_ACKNOWLEDGE);
当createSession的第一个参数为false时,表示创建的session没有标记为transactional,此时有三种用于消息确认的选项:
**AUTO_ACKNOWLEDGE session将自动地确认收到的一则消息;
**CLIENT_ACKNOWLEDGE 客户端程序将确认收到的一则消息,调用这则消息的确认方法;
**DUPS_OK_ACKNOWLEDGE 这个选项命令session“懒散的”确认消息传递,可以想到,这将导致消息提供者传递的一些复制消息可能出错。
JMS有两种消息传递方式。标记为NON_PERSISTENT的消息最多传递一次,而标记为PERSISTENT的消息将使用暂存后再转发的机理投递。如果一个JMS服务离线,那么持久性消息不会丢失,但是得等到这个服务恢复联机的时候才会被传递。所以默认的消息传递方式是非持久性的,虽然使用非持久性消息可能降低内存和需要的存储器,但这种传递方式只有当你不需要接收所有消息时才使用。
因此正确的代码只需改动一处就行了,即将true改为false
- //创建JMS连接和会话
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
- connection = factory.createConnection();
- connection.setClientID(Constant.JMS_CLIENT_ID);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // 创建消息发送主题和发送者
- Topic jmsSendTopic = session.createTopic(sendTopic);
- sendTopicProducer = session.createProducer(jmsSendTopic);
- sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
- // 创建消息接收主题和接收者
- Topic jmsReceiveTopic = session.createTopic(receiveTopic);
- receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);
- receiveTopicConsumer.setMessageListener(this);
- connection.start();
http://riddickbryant.iteye.com/blog/441890
相关推荐
springboot集成activemq实现消息接收demo
ActiveMQ队列消息过期时间设置和自动清除解决方案.docx
一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...
springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。
用C#实现的ActiveMQ发布/订阅消息传送源程序
go语言实现的读取、发送消息到activemq,使用的stomp协议。
activeMQ的发送消息后接收者返回信息
结合博客提供spring+activeMQ的demo源码
详细描述了ActiveMQ消息过期-时间设置和自动清除解决方案。
springboot整合activeMq的使用,队列,主题,消息手动确认,重发机制
ActiveMQ(包括消息生成端和andorid消息接受端),实现了点对点的消息推送,和广播消息推送,当然离线推送也实现了。
SpringBoot快速玩转ActiveMQ消息队列,jdk8下的简要版介绍。
activeMQ的测试工具,用于发送和接收activeMQ消息,jar包形式的,安装完jdk之后用java -jar xxx.jar命令运行
ActiveMQ 消息队列
activemq消息中间件-视频教程activemq消息中间件-视频教程activemq消息中间件-视频教程activemq消息中间件-视频教程
百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...
springboot整合ActiveMQ源码,适合范围消息队列入门小伙伴,对ActiveMQ消息队列不太了解,不知道如何发送消息,接收消息可以围观。
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。 下载本压缩包后解压运行里面的activemq.bat即可。 http://192.168.0.61:8161/ 是管理ActiveMQ的后台管理系统入口, 如需在JAVA中使用,请下载本人的 ...
简单的activemq点对点的同步消息模型
SpringBoot整合ActiveMQ(消息中间件)实现邮件发送功能,里面含有详细业务逻辑代码,配置文件等