RabbitMQ生产者和消费者代码(单一消费者,消费容器) & 消息确认机制
发布日期:2021-07-20 20:54:05
浏览次数:31
分类:技术文章
本文共 9537 字,大约阅读时间需要 31 分钟。
生产者简单代码:
package com.star;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * 消息发送者 - Producer。 * Producer对象,必须交由Spring容器管理,所以必须加上@Component注解 * 使用SpringBoot提供的AMQP启动器,来访问rabbitmq的时候,都是通过AmqpTemplate来实现的。 */@Componentpublic class MessageProducer { // AMQP启动器会自动在spring容器启动的时候初始化一个AmqpTemplate对象,我们只需要注入进来就可以了 @Autowired private AmqpTemplate rabbitAmqpTemplate; // 将配置文件中的交换器名称注入进来 @Value("${mq.config.exchange}") private String exchange; // 将配置文件中的路由键名称注入进来 @Value("${mq.config.queue.info.routing.key}") private String routingkey; // 发送消息, 参数也可以是javaBean, 但bean类必须实现序列化接口 public void send(String msg) { /** * convertAndSend - 转换并发送消息的template方法。 * 是将传入的普通java对象,转换为rabbitmq中需要的message类型对象,并发送消息到rabbitmq中。 * 参数一:交换器名称。 类型是String * 参数二:路由键。 类型是String * 参数三:消息,是要发送的消息内容对象。类型是Object */ this.rabbitAmqpTemplate.convertAndSend(this.exchange, this.routingkey,msg); } }
生产者代码工具类:
package com.star.rabbitMq;import java.util.Properties;import javax.annotation.PostConstruct;import org.springframework.amqp.core.AmqpAdmin;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.Binding.DestinationType;import org.springframework.amqp.core.DirectExchange;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.amqp.core.Exchange;@Componentpublic class RabbitMQUtil { @Autowired private AmqpAdmin amqpAdmin1; @Autowired private AmqpTemplate rabbitAmqpTemplate1; private static AmqpAdmin amqpAdmin; private static AmqpTemplate rabbitAmqpTemplate; private static RabbitMQUtil rabbitMQUtil; @PostConstruct public void init() { rabbitMQUtil = this; amqpAdmin = this.amqpAdmin1; rabbitAmqpTemplate = this.rabbitAmqpTemplate1; //这行代码可以写在业务代码里面,虽然初始化队列的绑定只需要一次,但如果mq宕机重启,或者在管理页面误删队列和交换机,就要重新初始化 /** * 如: if (!RabbitMQUtil.checkQueueExists("queueName")) { // RabbitMQUtil.createQueueAndBinding("exchange.ac.alarm.log", "alarmLogAc", "routing.key.ac.alarm.log"); } */ createQueueAndBinding("exchangeName", "queueName", "routingKey"); System.out.println("rmq初始化完毕"); } /** * 创建一个持久化的、非排他的、非自动删除的队列 * @param name * @return */ public static String createQueue(String name) { // 创建一个持久化的、非排他的、非自动删除的队列 Queue queue = new Queue(name,true,false,false,null); return createQueue(queue); } /** * 创建队列 * @param queue * @return */ public static String createQueue(Queue queue) { return amqpAdmin.declareQueue(queue); } /** * 将队列与交换器绑定 * @param exchangeName 交换器名称 * @param queueName 队列名称 * @param routingKey 路由键 */ public static void binding(String exchangeName,String queueName,String routingKey) { Binding binding = new Binding(queueName,DestinationType.QUEUE,exchangeName,routingKey,null); binding(binding); } /** * 将队列与交换器绑定 * @param binding */ public static void binding(Binding binding) { amqpAdmin.declareBinding(binding); } /** * * @param exchangeName 交换器名称 * @param queueName 队列名称 * @param routingKey 路由键 */ public static void createQueueAndBinding(String exchangeName,String queueName,String routingKey) { RabbitMQUtil.createDirectExchange(exchangeName); // 创建一个持久化的、非排他的、非自动删除的队列 Queue queue = new Queue(queueName,true,false,false,null); createQueue(queue); Binding binding = new Binding(queueName,DestinationType.QUEUE,exchangeName,routingKey,null); binding(binding); } /** * 检查队列是否存在 * @param queueName 队列名称 * @return 如果队列存在则放回true */ public static boolean checkQueueExists(String queueName) { Properties queueProperties = amqpAdmin.getQueueProperties(queueName); if(null == queueProperties) { return false; } return true; } /** * 创建direct类型的交换器 * @param exchangeName */ public static void createDirectExchange(String exchangeName) { Exchange exchange = new DirectExchange(exchangeName); amqpAdmin.declareExchange(exchange); } /** * 创建交换器 * @param exchange */ public static void createExchange(Exchange exchange) { amqpAdmin.declareExchange(exchange); } public static void sendMessage(String exchangeName,String routingkey,String message) { rabbitAmqpTemplate.convertAndSend(exchangeName,routingkey,message); } }
单一消费者代码:
在默认direct模式下, 多个消费者订阅一个queue,消息会轮流发送至每个消费者, 如"msg1"只被Consumer1接收, "msg2"只被Consumer2接收, 单一消费者模式下,消息确认模式为自动, 如果process方法抛异常则消息确认失败, 会先给其他消费者消费,否则再次由此消费者进行消费, 如果异常一直无法处理, 则会无限消费= =
package com.star.ac.rabbitMq;import com.alibaba.fastjson.JSON;import org.apache.juli.logging.Log;import org.apache.juli.logging.LogFactory;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * 消息接收者 - consumer * * @RabbitListener - 可以注解类和方法。 * 注解类,当表当前类的对象是一个rabbit listener。 * 监听逻辑明确,可以由更好的方法定义规范。 * 必须配合@RabbitHandler才能实现rabbit消息消费能力。 * 注解方法,代表当前方法是一个rabbit listener处理逻辑。 * 方便开发,一个类中可以定义若干个listener逻辑。 * 方法定义规范可能不合理。如:一个方法的处理逻辑太多,造成方法的bad smell。 * * @RabbitListener - 代表当前类型是一个rabbitmq的监听器。 * bindings:绑定队列 * @QueueBinding - @RabbitListener.bindings属性的类型。绑定一个队列。 * value:绑定队列, Queue类型。 * exchange:配置交换器, Exchange类型。 * key:路由键,字符串类型。 * * @Queue - 队列。 * value:队列名称 * autoDelete:是否是一个临时队列。 * true :当所有的consumer关闭后,自动删除queue。 * false:当任意一个consumer启动并创建queue后,如果queue中有消息未消费,无论是否有consumer继续执行,都保存queue。 * * @Exchange - 交换器 * value:为交换器起个名称 * type:指定具体的交换器类型 */@Component@RabbitListener( bindings=@QueueBinding( value=@Queue(value="alarmLogAc",autoDelete="false"), exchange=@Exchange(value="exchange.ac.alarm.log",type=ExchangeTypes.DIRECT), key="routing.key.ac.alarm.log" ) )public class RabbitMqListener { private Log log = LogFactory.getLog(RabbitMqListener.class); /** * 消费消息的方法。采用消息队列监听机制 * @RabbitHandler - 代表当前方法是监听队列状态的方法,就是队列状态发生变化后,执行的消费消息的方法。 * 方法参数。就是处理的消息的数据载体类型。 */ @RabbitHandler public void process(String msg) { /* * if (2 == 4-2) { throw new RuntimeException("普通监听抛异常"); } */ log.info("普通监听【从RabbitMQ接收消息成功】消息内容为:" + msg); String jsonStr = JSON.toJSONString(msg); }}
消费容器模式和手动确认消息模式:
config类:
package com.star.ac.rabbitMq;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.star.ac.rabbitMq.consumer.AlarmConsumer;@Configurationpublic class RabbitMqConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private String port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Bean("rabbitMQConnectionFactory") public ConnectionFactory rabbitMQConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(host); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); // 必须要设置 return connectionFactory; } @Bean("messageListenerContainer") public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitMQConnectionFactory()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.addQueueNames("alarmLogAc"); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new AlarmConsumer());// 设置消息处理器 return container; }}
消费者处理类:
package com.star.ac.rabbitMq.consumer;import org.apache.log4j.Logger;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;@Componentpublic class AlarmConsumer implements ChannelAwareMessageListener{ private static final Logger logger = Logger.getLogger(AlarmConsumer.class); @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody(),"UTF-8"); String queueName = message.getMessageProperties().getConsumerQueue(); logger.info("容器监听队列名:" + queueName); logger.info("队列名:"+queueName+",容器监听【从RabbitMQ接收消息成功】消息内容为:" + msg); try { long start1 = System.currentTimeMillis(); /* * if (1==1) { throw new RuntimeException("消息处理失败测试"); } */ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); logger.info("推送消息耗时:" + (System.currentTimeMillis() - start1)); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); logger.error("消息推送失败"); } } }
消息处理成功, 要调用channel.basicAck(...)方法,否则此条消息视为未确认, 但不会像单一消费者一样(此时这个消费者可以继续消费下一条消息),立马重试,而是被保存在队列中, 等待其他消费者注册, 或是此消费者应用重启时也会再次消费, 如果又失败, 同样下次继续重试.
如果异常,调用channel.basicNack(), 消息处理失败, 则和单一消费者中的异常一样, 先给其他消费者否则无限重试.
转载地址:https://blog.csdn.net/liao0801_123/article/details/90039877 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
感谢大佬
[***.8.128.20]2024年03月27日 17时56分21秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
matlab 50hzquchu,新手求消除50HZ工频干扰陷波滤波器源程序
2019-04-21
php不需要也能输出,php 如何只输出最后生成的那个值??
2019-04-21
php正则过滤sql关键字,使用正则表达式屏蔽关键字的方法
2019-04-21
php取整v,php取整方式分享
2019-04-21
php写模糊搜索api接口,php通过sphinxapi接口实现全文搜索
2019-04-21
java图片加气泡文字_图片加气泡文字
2019-04-21
java总结i o流_14.java总结I/O流
2019-04-21
java和历转为西历_日期转西暦,和暦
2019-04-21
java 远程 yarn jar_再论Yarn Client和Yarn cluster
2019-04-21
java单元测试断言_单元测试+断言
2019-04-21
java 创建压缩包_用Java创建ZIP压缩文件
2019-04-21
java typedarray_TintTypedArray.java
2019-04-21
java字符字面量_java – 字符串字面量的行为是令人困惑的
2019-04-21
php判断数组的值是否为空,PHP判断数组是否为空的常用方法(五种方法)
2019-04-21
php 读数据库,PHP数据库
2019-04-21