循序渐进ActiveMQ(4)----临时目标与【request/reply】模型
发布日期:2021-06-30 13:45:32
浏览次数:2
分类:技术文章
本文共 5990 字,大约阅读时间需要 19 分钟。
ActiveMq通过createTemporaryQueue,CreateTemporaryTopic创建临时目标,这些目标的生命周期是创建它的Connection的关闭;只有创建它的Connection所创建的session才能从临时目标中接收消息;不过任何的生产者都可以向临时目标中发送消息;如果关闭了创建此临时目标的Connection,那么临时目标被关闭,内容也将消失。
我们写一个demo去验证一件事:
1 同一个connection创建的不同session可以访问这些session中某一个session创建的临时目标。
2 不同connection创建的session,不能访问某个session创建的临时目标。
3 在满足1的情况下,某一个session创建的生产者发送消息到固定的一个队列中并且设置replyto临时队列,消费者可以从这个队列中收到消息并创建一个目标为replyto临时队列的生产者,发送一条replyMessage消息到replyto临时队列;由另外一个session(同一个connection所创建)创建消费者监听这个replyto临时队列,收到消息并打印。
package jeff.mq.tempDestination;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import javax.jms.TemporaryQueue;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.command.ActiveMQQueue;/** * 【request/reply模式】 * * TemporaryQueue虽然是由Session会话创建,但是它的生命周期属于整个Connection连接 * 如果在一个Connection上创建两个Session会话,则一个Session1创建的TemporaryQueue或TemporaryTopic也可以被另一个Session2访问, * 也就是临时目标可以被同一个Conenction的不同Session会话访问。 * * 如果这两个Session会话是由不同的Connection连接创建,则一个Session1创建的TemporaryQueue不可以被另一个Session2访问。 * 也就是不同连接Connction创建的Session所创建的临时目标是相互隔离的 * * * 临时目标的主要作用就是为了指定回复目的地 * * @author jeffSheng * 2018年7月7日 */public class TemporaryQueueTest { private ConnectionFactory connectionFactory; private Connection connection; private Connection connection1; private Queue jeffTempQueue ; public TemporaryQueueTest(){ try { this.connectionFactory = new ActiveMQConnectionFactory( "jeff", "123456", "tcp://localhost:61616"); connection = connectionFactory.createConnection(); connection1 = connectionFactory.createConnection(); connection.start(); connection1.start(); //第一步:先创建一个queue备用,生产者会向这个队列中发送消息 jeffTempQueue = new ActiveMQQueue("jeffTempQueue"); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { TemporaryQueueTest tt = new TemporaryQueueTest(); tt.requestAndReply(tt.connection); } private void requestTempQueueByAnotherConnection(Connection connection,TemporaryQueue replyQueue ) throws JMSException{ Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); //创建消费者接收replyQueue这个临时queue中的消息 MessageConsumer replyComsumer = session.createConsumer(replyQueue); replyComsumer.setMessageListener(new MessageListener() { public void onMessage(Message m) { try { System.out.println("【Connection-1】:Get reply: " + ((TextMessage) m).getText()); } catch (JMSException e) { } } }); } public void requestAndReply(Connection connection) throws JMSException{ //第二步:使用connection连接创建一个session final Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //第三步: 使用这个session创建一个TemporaryQueue TemporaryQueue replyQueue = session.createTemporaryQueue(); //第七步:【创建消费者】 使用同一个session 创建一个消费者接收tempQueue(生产者发送消息的队列)队列的消息,并回复到生产者指定的replyQueue中 MessageConsumer comsumer = session.createConsumer(this.jeffTempQueue); //第八步:【注册监听器】消费者注册监听器监听消息的到来 comsumer.setMessageListener(new MessageListener() { public void onMessage(Message m) { try { System.out.println("【Connection】:Get Message: " + ((TextMessage) m).getText()); //第九部:【消费者创建生产者】消费者收到消息后,创建一个生产者往对面生产者指定的响应队列中发送响应消息 MessageProducer producer = session.createProducer(m.getJMSReplyTo()); //第十步:【消费者创建并发送消息】 TextMessage replyMessage = session.createTextMessage("ReplyMessage"); //一定是发送到了replyQueue producer.send(replyMessage); } catch (JMSException e) { } } }); //第十一步:【创建session2】 使用同一个Connection创建另一个Session,来读取replyQueue上的消息。 Session session2 = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); //第十二步:【创建session2的消费者】创建一个消费者接收replyQueue的消息 MessageConsumer replyComsumer = session2.createConsumer(replyQueue); replyComsumer.setMessageListener(new MessageListener() { public void onMessage(Message m) { try { System.out.println("【Connection】:Get reply: " + ((TextMessage) m).getText()); } catch (JMSException e) { } } }); //使用另外一个connection创建的session去创建消费者去访问其他session创建的replyQueue //requestTempQueueByAnotherConnection(this.connection1,replyQueue); //第四步:【创建生产者】使用同一个session创建生产者,并往tempQueue发送消息 MessageProducer producer = session.createProducer(this.jeffTempQueue); TextMessage message = session.createTextMessage("SimpleMessage"); //第五步:【设置响应队列】生产者设置消息的replayQueue,最终消费者接收到消息后往这个队列中发送响应 message.setJMSReplyTo(replyQueue); //第六步:【发送消息】生产者发送消息到tempQueue producer.send(message); }}
我们先注释掉:
//使用另外一个connection创建的session去创建消费者去访问其他session创建的replyQueue// requestTempQueueByAnotherConnection(this.connection1,replyQueue);
启动打印:
说明我们上边说的1和2,然后我们放开注释:
启动打印:
log4j:WARN No appenders could be found for logger (org.apache.activemq.transport.WireFormatNegotiator).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.Exception in thread "main" javax.jms.InvalidDestinationException: Cannot use a Temporary destination from another Connection at org.apache.activemq.ActiveMQMessageConsumer.(ActiveMQMessageConsumer.java:196) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1239) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1183) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1095) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1067) at jeff.mq.tempDestination.TemporaryQueueTest.requestTempQueueByAnotherConnection(TemporaryQueueTest.java:76) at jeff.mq.tempDestination.TemporaryQueueTest.requestAndReply(TemporaryQueueTest.java:131) at jeff.mq.tempDestination.TemporaryQueueTest.main(TemporaryQueueTest.java:68)
提示信息:不能使用另外一个connection来访问临时目标!
这就是简单的request/reply模型的使用!
转载地址:https://jeffsheng.blog.csdn.net/article/details/80951993 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年05月04日 14时20分28秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
scrapy 排错记录
2021-07-03
ACM路上的一大失误
2021-07-03
HDOJ2049 不容易系列之(4)——考新郎
2021-07-03
Codeforces Round #369 (Div. 2)
2021-07-03
HDU-2838 Cow Sorting(树状数组)
2021-07-03
基于SSM的兼职论坛系统的设计与实现
2021-07-03
基于java的ssm框架就业信息管理系统的设计
2021-07-03
如何用同期群分析模型提升留存?(Tableau实战)
2021-07-03
Oracle字符串分隔符替换(替换奇数个或偶数个)
2021-07-03
Oracle 利用 UTL_SMTP 包发送邮件
2021-07-03
Oracle 的循环中的异常捕捉和处理
2021-07-03
Oracle的pfile和spfile的一点理解和笔记
2021-07-03
java实现稀疏数组及将稀疏数组存入硬盘中
2021-07-03
2021-05-18
2021-07-03
libuv实现ping包发送和接收
2021-07-03
基础架构系列篇-系统centos7安装docker+COMPOSE
2021-07-03
基础架构系列篇-NGINX部署VUE
2021-07-03
基础架构系列篇-系统centos7安装kafka
2021-07-03
软件质量的8个特性
2021-07-03