ActiveMQ 发送和接收消息
发布日期:2021-10-24 12:41:37 浏览次数:1 分类:技术文章

本文共 5395 字,大约阅读时间需要 17 分钟。

一、添加 jar 包

org.apache.activemq
activemq-all
5.11.2

二、消息传递的两种形式

  1、点对点:发送的消息只能被一个消费者接收,第一个消费者接收后,消息没了

  2、发布/订阅:消息可以被多个消费者接收 。发完消息,如果没有消费者接收,这消息会自动消失。也就是说,消费者服务必须是启动的状态。( topic 消息在 ActiveMQ 服务端默认不是持久化的,可以通过配置文件配置持久化 )

三、点对点发送消息

/** * 点到点形式发送消息 * @throws Exception */@Testpublic void testQueueProducer() throws Exception{    //1、创建一个连接工厂,需要指定服务的 ip 和端口    String brokerURL = "tcp://192.168.25.129:61616";    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);    //2、使用工厂对象创建一个 Connection 对象    Connection connection = connectionFactory.createConnection();    //3、开启连接,调用 Connection 对象的 start 方法    connection.start();    //4、创建一个 Session 对象。        //第一个参数:是否开启事务(一般不开启,如果开启事务,第二个参数没意义);        //第二个参数:应答模式。自动应答或者手动应答,一般是自动应答    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    //5、使用 Session 对象创建一个 Destination 对象。两种形式 queue、topic。    Queue queue = session.createQueue("test-queue");    //6、使用 Session 对象创建一个 Producer 对象    MessageProducer producer = session.createProducer(queue);    //7、创建一个 Message 对象,可以使用 TextMessage。下面两种方式都可以    /*TextMessage textMessage = new ActiveMQTextMessage();     textMessage.setText("hello ActiveMQ");*/    TextMessage textMessage = session.createTextMessage("hello ActiveMQ");    //8、发布消息    producer.send(textMessage);    //9、关闭资源    producer.close();    session.close();    connection.close();}

四、点对点接收消息

/** * 点对点接收消息 * @throws Exception */@Testpublic void testQueueConsumer() throws Exception{    //1、创建一个 ConnectionFactory 对象连接 MQ 服务器    String brokerURL = "tcp://192.168.25.129:61616";    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);    //2、创建一个连接对象    Connection connection = connectionFactory.createConnection();    //3、开启连接    connection.start();    //4、使用 Connection 对象 创建一个 Session 对象    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    //5、创建一个 Destination 对象。queue 对象    Queue queue = session.createQueue("test-queue");    //6、使用 Session 对象创建一个消费者    MessageConsumer consumer = session.createConsumer(queue);    //7、接收消息    consumer.setMessageListener(new MessageListener() {                @Override        public void onMessage(Message message) {            //8、打印结果            TextMessage textMessage = (TextMessage) message;                        try {                String text = textMessage.getText();                System.out.println(text);            } catch (JMSException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }                    }    });        //9、等待接收消息。( 接收到消息后才网下面执行。关闭资源 )    System.in.read();    //10、关闭资源    consumer.close();    session.close();    connection.close();    }

 五、广播发送消息

/** * 广播发送消息 * @throws Exception */@Testpublic void testTopicProducer() throws Exception{    //1、创建一个连接工厂,需要指定服务的 ip 和端口    String brokerURL = "tcp://192.168.25.129:61616";    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);    //2、使用工厂对象创建一个 Connection 对象    Connection connection = connectionFactory.createConnection();    //3、开启连接,调用 Connection 对象的 start 方法    connection.start();    //4、创建一个 Session 对象。        //第一个参数:是否开启事务(一般不开启,如果开启事务,第二个参数没意义);        //第二个参数:应答模式。自动应答或者手动应答,一般是自动应答    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    //5、使用 Session 对象创建一个 Destination 对象。两种形式 queue、topic。    Topic topic = session.createTopic("test-topic");    //6、使用 Session 对象创建一个 Producer 对象    MessageProducer producer = session.createProducer(topic);    //7、创建一个 Message 对象,可以使用 TextMessage。下面两种方式都可以    /*TextMessage textMessage = new ActiveMQTextMessage();     textMessage.setText("hello ActiveMQ");*/    TextMessage textMessage = session.createTextMessage("hello ActiveMQ");    //8、发布消息    producer.send(textMessage);    //9、关闭资源    producer.close();    session.close();    connection.close();}

六、广播接收消息

/** * 广播接收消息 * @throws Exception */@Testpublic void testTopicConsumer() throws Exception{    //1、创建一个 ConnectionFactory 对象连接 MQ 服务器    String brokerURL = "tcp://192.168.25.129:61616";    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);    //2、创建一个连接对象    Connection connection = connectionFactory.createConnection();    //3、开启连接    connection.start();    //4、使用 Connection 对象 创建一个 Session 对象    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    //5、创建一个 Destination 对象。Topic 对象    Topic topic = session.createTopic("test-topic");    //6、使用 Session 对象创建一个消费者    MessageConsumer consumer = session.createConsumer(topic);    //7、接收消息    consumer.setMessageListener(new MessageListener() {                @Override        public void onMessage(Message message) {            //8、打印结果            TextMessage textMessage = (TextMessage) message;                        try {                String text = textMessage.getText();                System.out.println(text);            } catch (JMSException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }                    }    });    System.out.println("topic消费者");    //9、等待接收消息。( 接收到消息后才网下面执行。关闭资源 )    System.in.read();    //10、关闭资源    consumer.close();    session.close();    connection.close();  }

 

转载于:https://www.cnblogs.com/fangwu/p/8669036.html

转载地址:https://blog.csdn.net/weixin_30784141/article/details/98381475 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:【基于Python+Flask项目部署系列--03】开发测试环境配置-基于Ubuntu16.04
下一篇:redis 持久化

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年03月14日 11时21分47秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

mysql 行转列 显示_mysql 行转列 (结果集以坐标显示) 2019-04-21
mysql 完全备份恢复吗_MySQL完全备份与恢复 2019-04-21
wpf 绘制矩形_WPF制作倒影效果 2019-04-21
mariadb mysql 5.7_MariaDB 10.1 和 MySQL 5.7 在普通商用硬件上的表现 2019-04-21
由于连接方在一段时间后没有正确答复或连接的主机_新风换气机使用效果不佳,为何?掌握正确使用方法就好了... 2019-04-21
mysql数据库断电恢复_MySQL数据库InnoDB引擎下服务器断电数据恢复方法 2019-04-21
python入门程序异常_Python 入门 之 异常处理 2019-04-21
python 键盘输入int_Python编程 Python如何获取数据 2019-04-21
h3c trunk口改access_H3CNE配置VLAN的Access链路端口和Trunk链路端口 2019-04-21
mysql 查询姓王_MySQL查询语句练习题,测试足够用了 2019-04-21
mysql多实例脚本_mysql多实例脚本 2019-04-21
python如何生成excel文件夹_用python脚本通过excel生成文件夹树结构 2019-04-21
python获取post请求中的所有参数_Django从POST reques获取请求参数 2019-04-21
mysql加密复制_MySQL主从复制使用SSL加密 2019-04-21
python启动远端 exe_python打包exe开机自动启动的实例(windows) 2019-04-21
java当前路径_java获取当前路径的几种方法 2019-04-21
java web传递参数_Javaweb的八种传值方式 2019-04-21
java gui支持的包有哪两个_Java GUI 2019-04-21
java list详解_java集合List解析 2019-04-21
java坐标代码_java实现计算地理坐标之间的距离 2019-04-21