循序渐进ActiveMQ(4)----临时目标与【request/reply】模型
发布日期:2021-06-30 13:45:32 浏览次数:2 分类:技术文章

本文共 5990 字,大约阅读时间需要 19 分钟。

ActiveMq通过createTemporaryQueue,CreateTemporaryTopic创建临时目标,这些目标的生命周期是创建它的Connection的关闭;只有创建它的Connection所创建的session才能从临时目标中接收消息;不过任何的生产者都可以向临时目标中发送消息;如果关闭了创建此临时目标的Connection,那么临时目标被关闭,内容也将消失。

我们写一个demo去验证一件事:

1 同一个connection创建的不同session可以访问这些session中某一个session创建的临时目标。

2 不同connection创建的session,不能访问某个session创建的临时目标。

3 在满足1的情况下,某一个session创建的生产者发送消息到固定的一个队列中并且设置replyto临时队列,消费者可以从这个队列中收到消息并创建一个目标为replyto临时队列的生产者,发送一条replyMessage消息到replyto临时队列;由另外一个session(同一个connection所创建)创建消费者监听这个replyto临时队列,收到消息并打印。

package jeff.mq.tempDestination;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import javax.jms.TemporaryQueue;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.command.ActiveMQQueue;/** * 【request/reply模式】 *  * TemporaryQueue虽然是由Session会话创建,但是它的生命周期属于整个Connection连接 * 如果在一个Connection上创建两个Session会话,则一个Session1创建的TemporaryQueue或TemporaryTopic也可以被另一个Session2访问, * 也就是临时目标可以被同一个Conenction的不同Session会话访问。 *  * 如果这两个Session会话是由不同的Connection连接创建,则一个Session1创建的TemporaryQueue不可以被另一个Session2访问。 * 也就是不同连接Connction创建的Session所创建的临时目标是相互隔离的 *  *  * 临时目标的主要作用就是为了指定回复目的地 *  * @author jeffSheng * 2018年7月7日 */public class TemporaryQueueTest {		private ConnectionFactory connectionFactory;		private Connection connection;		private Connection connection1;		private Queue jeffTempQueue ;		public TemporaryQueueTest(){		try {			this.connectionFactory = 					new ActiveMQConnectionFactory(							"jeff", 							"123456", 							"tcp://localhost:61616");						connection = connectionFactory.createConnection();						connection1 = connectionFactory.createConnection();						connection.start();			connection1.start();			//第一步:先创建一个queue备用,生产者会向这个队列中发送消息			jeffTempQueue =  new ActiveMQQueue("jeffTempQueue");		} catch (Exception e) {			e.printStackTrace();		}	}			public static void main(String[] args) throws Exception {		TemporaryQueueTest tt = new TemporaryQueueTest();		tt.requestAndReply(tt.connection);	}		private void requestTempQueueByAnotherConnection(Connection connection,TemporaryQueue replyQueue ) throws JMSException{		Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);				//创建消费者接收replyQueue这个临时queue中的消息		MessageConsumer replyComsumer = session.createConsumer(replyQueue);				replyComsumer.setMessageListener(new MessageListener() {			public void onMessage(Message m) {				try {					System.out.println("【Connection-1】:Get reply: " + ((TextMessage) m).getText());				} catch (JMSException e) {				}			}		});			}	public void requestAndReply(Connection connection) throws JMSException{				//第二步:使用connection连接创建一个session		final Session session = connection.createSession(Boolean.FALSE,	Session.AUTO_ACKNOWLEDGE);				//第三步: 使用这个session创建一个TemporaryQueue		TemporaryQueue replyQueue = session.createTemporaryQueue();						//第七步:【创建消费者】 使用同一个session 创建一个消费者接收tempQueue(生产者发送消息的队列)队列的消息,并回复到生产者指定的replyQueue中		MessageConsumer comsumer = session.createConsumer(this.jeffTempQueue);		//第八步:【注册监听器】消费者注册监听器监听消息的到来		comsumer.setMessageListener(new MessageListener() {			public void onMessage(Message m) {				try {					System.out.println("【Connection】:Get Message: " + ((TextMessage) m).getText());					//第九部:【消费者创建生产者】消费者收到消息后,创建一个生产者往对面生产者指定的响应队列中发送响应消息					MessageProducer producer = session.createProducer(m.getJMSReplyTo());					//第十步:【消费者创建并发送消息】					TextMessage replyMessage =  session.createTextMessage("ReplyMessage");					//一定是发送到了replyQueue					producer.send(replyMessage);				} catch (JMSException e) {				}			}		});				//第十一步:【创建session2】 使用同一个Connection创建另一个Session,来读取replyQueue上的消息。		Session session2 = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);		//第十二步:【创建session2的消费者】创建一个消费者接收replyQueue的消息		MessageConsumer replyComsumer = session2.createConsumer(replyQueue);		replyComsumer.setMessageListener(new MessageListener() {			public void onMessage(Message m) {				try {					System.out.println("【Connection】:Get reply: " + ((TextMessage) m).getText());				} catch (JMSException e) {				}			}		});				//使用另外一个connection创建的session去创建消费者去访问其他session创建的replyQueue		//requestTempQueueByAnotherConnection(this.connection1,replyQueue);				//第四步:【创建生产者】使用同一个session创建生产者,并往tempQueue发送消息		MessageProducer producer = session.createProducer(this.jeffTempQueue);		TextMessage message = session.createTextMessage("SimpleMessage");		//第五步:【设置响应队列】生产者设置消息的replayQueue,最终消费者接收到消息后往这个队列中发送响应		message.setJMSReplyTo(replyQueue);		//第六步:【发送消息】生产者发送消息到tempQueue			producer.send(message);	}}

我们先注释掉:

//使用另外一个connection创建的session去创建消费者去访问其他session创建的replyQueue//		requestTempQueueByAnotherConnection(this.connection1,replyQueue);

启动打印:

说明我们上边说的1和2,然后我们放开注释:

启动打印:

log4j:WARN No appenders could be found for logger (org.apache.activemq.transport.WireFormatNegotiator).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.Exception in thread "main" javax.jms.InvalidDestinationException: Cannot use a Temporary destination from another Connection	at org.apache.activemq.ActiveMQMessageConsumer.
(ActiveMQMessageConsumer.java:196) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1239) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1183) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1095) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1067) at jeff.mq.tempDestination.TemporaryQueueTest.requestTempQueueByAnotherConnection(TemporaryQueueTest.java:76) at jeff.mq.tempDestination.TemporaryQueueTest.requestAndReply(TemporaryQueueTest.java:131) at jeff.mq.tempDestination.TemporaryQueueTest.main(TemporaryQueueTest.java:68)

提示信息:不能使用另外一个connection来访问临时目标!

这就是简单的request/reply模型的使用!

转载地址:https://jeffsheng.blog.csdn.net/article/details/80951993 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:循序渐进ActiveMQ(5)----【p2p】模型和【pub/sub】模型
下一篇:循序渐进ActiveMQ(3)----MessageConsumer的消息选择器及mysql消息持久化

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年05月04日 14时20分28秒