RabbitMQ生产者和消费者代码(单一消费者,消费容器) & 消息确认机制
发布日期:2021-07-20 20:54:05 浏览次数:31 分类:技术文章

本文共 9537 字,大约阅读时间需要 31 分钟。

生产者简单代码:

package com.star;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * 消息发送者 - Producer。 * Producer对象,必须交由Spring容器管理,所以必须加上@Component注解 * 使用SpringBoot提供的AMQP启动器,来访问rabbitmq的时候,都是通过AmqpTemplate来实现的。 */@Componentpublic class MessageProducer {		// AMQP启动器会自动在spring容器启动的时候初始化一个AmqpTemplate对象,我们只需要注入进来就可以了	@Autowired	private AmqpTemplate rabbitAmqpTemplate;		// 将配置文件中的交换器名称注入进来	@Value("${mq.config.exchange}")	private String exchange;		// 将配置文件中的路由键名称注入进来	@Value("${mq.config.queue.info.routing.key}")	private String routingkey;		// 发送消息, 参数也可以是javaBean, 但bean类必须实现序列化接口	public void send(String msg) {		/**		 * convertAndSend - 转换并发送消息的template方法。		 * 是将传入的普通java对象,转换为rabbitmq中需要的message类型对象,并发送消息到rabbitmq中。		 * 参数一:交换器名称。 类型是String		 * 参数二:路由键。 类型是String		 * 参数三:消息,是要发送的消息内容对象。类型是Object		 */		this.rabbitAmqpTemplate.convertAndSend(this.exchange, this.routingkey,msg);	}	}

生产者代码工具类:

package com.star.rabbitMq;import java.util.Properties;import javax.annotation.PostConstruct;import org.springframework.amqp.core.AmqpAdmin;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.Binding.DestinationType;import org.springframework.amqp.core.DirectExchange;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.amqp.core.Exchange;@Componentpublic class RabbitMQUtil {		@Autowired	private AmqpAdmin amqpAdmin1;		@Autowired	private AmqpTemplate rabbitAmqpTemplate1;		private static AmqpAdmin amqpAdmin;		private static AmqpTemplate rabbitAmqpTemplate;	private static RabbitMQUtil rabbitMQUtil;		@PostConstruct	public void init() {		rabbitMQUtil = this;		amqpAdmin = this.amqpAdmin1;		rabbitAmqpTemplate = this.rabbitAmqpTemplate1;		//这行代码可以写在业务代码里面,虽然初始化队列的绑定只需要一次,但如果mq宕机重启,或者在管理页面误删队列和交换机,就要重新初始化		/**		 * 如: if (!RabbitMQUtil.checkQueueExists("queueName")) {			//			RabbitMQUtil.createQueueAndBinding("exchange.ac.alarm.log", "alarmLogAc", "routing.key.ac.alarm.log");		}		 */		createQueueAndBinding("exchangeName", "queueName", "routingKey");		System.out.println("rmq初始化完毕");	}			/**	 * 创建一个持久化的、非排他的、非自动删除的队列	 * @param name	 * @return	 */	public static String createQueue(String name) {		// 创建一个持久化的、非排他的、非自动删除的队列		Queue queue = new Queue(name,true,false,false,null);		return createQueue(queue);	}	/**	 * 创建队列	 * @param queue	 * @return	 */	public static String createQueue(Queue queue) {		return amqpAdmin.declareQueue(queue);	}			/**	 * 将队列与交换器绑定	 * @param exchangeName 交换器名称	 * @param queueName 队列名称	 * @param routingKey 路由键	 */	public static void binding(String exchangeName,String queueName,String routingKey) {		Binding binding = new Binding(queueName,DestinationType.QUEUE,exchangeName,routingKey,null);		binding(binding);	}		/**	 * 将队列与交换器绑定	 * @param binding	 */	public static void binding(Binding binding) {		amqpAdmin.declareBinding(binding);	}	/**	 * 	 * @param exchangeName 交换器名称	 * @param queueName 队列名称	 * @param routingKey 路由键	 */	public static void createQueueAndBinding(String exchangeName,String queueName,String routingKey) {		RabbitMQUtil.createDirectExchange(exchangeName);		// 创建一个持久化的、非排他的、非自动删除的队列		Queue queue = new Queue(queueName,true,false,false,null);		createQueue(queue);		Binding binding = new Binding(queueName,DestinationType.QUEUE,exchangeName,routingKey,null);		binding(binding);	}		/**	 * 检查队列是否存在	 * @param queueName 队列名称	 * @return 如果队列存在则放回true	 */	public static boolean checkQueueExists(String queueName) {		Properties queueProperties = amqpAdmin.getQueueProperties(queueName);		if(null == queueProperties) {			return false;		}		return true;	}		/**	 * 创建direct类型的交换器	 * @param exchangeName	 */	public static void createDirectExchange(String exchangeName) {		Exchange exchange = new DirectExchange(exchangeName);		amqpAdmin.declareExchange(exchange);	}		/**	 * 创建交换器	 * @param exchange	 */	public static void createExchange(Exchange exchange) {		amqpAdmin.declareExchange(exchange);	}			public static void sendMessage(String exchangeName,String routingkey,String message) {		rabbitAmqpTemplate.convertAndSend(exchangeName,routingkey,message);	}	}

单一消费者代码:

在默认direct模式下, 多个消费者订阅一个queue,消息会轮流发送至每个消费者, 如"msg1"只被Consumer1接收, "msg2"只被Consumer2接收, 单一消费者模式下,消息确认模式为自动, 如果process方法抛异常则消息确认失败, 会先给其他消费者消费,否则再次由此消费者进行消费, 如果异常一直无法处理, 则会无限消费= =  

package com.star.ac.rabbitMq;import com.alibaba.fastjson.JSON;import org.apache.juli.logging.Log;import org.apache.juli.logging.LogFactory;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * 消息接收者 - consumer *  * @RabbitListener - 可以注解类和方法。 *  注解类,当表当前类的对象是一个rabbit listener。 *      监听逻辑明确,可以由更好的方法定义规范。 *      必须配合@RabbitHandler才能实现rabbit消息消费能力。 *  注解方法,代表当前方法是一个rabbit listener处理逻辑。 *      方便开发,一个类中可以定义若干个listener逻辑。 *      方法定义规范可能不合理。如:一个方法的处理逻辑太多,造成方法的bad smell。 *  * @RabbitListener -  代表当前类型是一个rabbitmq的监听器。 *      bindings:绑定队列 * @QueueBinding  - @RabbitListener.bindings属性的类型。绑定一个队列。 *      value:绑定队列, Queue类型。 *      exchange:配置交换器, Exchange类型。 *      key:路由键,字符串类型。 *  * @Queue - 队列。 *      value:队列名称 *      autoDelete:是否是一个临时队列。 *          true :当所有的consumer关闭后,自动删除queue。 *          false:当任意一个consumer启动并创建queue后,如果queue中有消息未消费,无论是否有consumer继续执行,都保存queue。 *  * @Exchange - 交换器 *      value:为交换器起个名称 *      type:指定具体的交换器类型 */@Component@RabbitListener(			bindings=@QueueBinding(					value=@Queue(value="alarmLogAc",autoDelete="false"),					exchange=@Exchange(value="exchange.ac.alarm.log",type=ExchangeTypes.DIRECT),					key="routing.key.ac.alarm.log"			)		)public class RabbitMqListener {	private Log log = LogFactory.getLog(RabbitMqListener.class);		/**	 * 消费消息的方法。采用消息队列监听机制	 * @RabbitHandler - 代表当前方法是监听队列状态的方法,就是队列状态发生变化后,执行的消费消息的方法。	 * 方法参数。就是处理的消息的数据载体类型。	 */	@RabbitHandler	public void process(String msg) {		/*		 * if (2 == 4-2) { throw new RuntimeException("普通监听抛异常"); }		 */		log.info("普通监听【从RabbitMQ接收消息成功】消息内容为:" + msg);		String jsonStr = JSON.toJSONString(msg);	}}

消费容器模式和手动确认消息模式:

config类:

package com.star.ac.rabbitMq;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.star.ac.rabbitMq.consumer.AlarmConsumer;@Configurationpublic class RabbitMqConfig {	@Value("${spring.rabbitmq.host}")	private String host;	@Value("${spring.rabbitmq.port}")	private String port;	@Value("${spring.rabbitmq.username}")	private String username;	@Value("${spring.rabbitmq.password}")	private String password;	@Value("${spring.rabbitmq.virtual-host}")	private String virtualHost;	@Bean("rabbitMQConnectionFactory")	public ConnectionFactory rabbitMQConnectionFactory() {		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();		connectionFactory.setAddresses(host);		connectionFactory.setUsername(username);		connectionFactory.setPassword(password);		connectionFactory.setVirtualHost(virtualHost);		connectionFactory.setPublisherConfirms(true); // 必须要设置		return connectionFactory;	}	@Bean("messageListenerContainer")	public SimpleMessageListenerContainer messageContainer() {		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitMQConnectionFactory());		container.setExposeListenerChannel(true);		container.setMaxConcurrentConsumers(1);        container.setConcurrentConsumers(1);        container.addQueueNames("alarmLogAc");        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认        container.setMessageListener(new AlarmConsumer());// 设置消息处理器		return container;	}}

消费者处理类:

package com.star.ac.rabbitMq.consumer;import org.apache.log4j.Logger;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;@Componentpublic class AlarmConsumer implements ChannelAwareMessageListener{			private static final Logger logger = Logger.getLogger(AlarmConsumer.class);	@Override	public void onMessage(Message message, Channel channel) throws Exception {		String msg = new String(message.getBody(),"UTF-8");		String queueName = message.getMessageProperties().getConsumerQueue();		logger.info("容器监听队列名:" + queueName);		logger.info("队列名:"+queueName+",容器监听【从RabbitMQ接收消息成功】消息内容为:" + msg);		try {			long start1 = System.currentTimeMillis();			/*			 * if (1==1) { throw new RuntimeException("消息处理失败测试"); }			 */			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);			logger.info("推送消息耗时:" + (System.currentTimeMillis() - start1));		} catch (Exception e) {			channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);			logger.error("消息推送失败");		}	}	}

消息处理成功, 要调用channel.basicAck(...)方法,否则此条消息视为未确认, 但不会像单一消费者一样(此时这个消费者可以继续消费下一条消息),立马重试,而是被保存在队列中, 等待其他消费者注册, 或是此消费者应用重启时也会再次消费, 如果又失败, 同样下次继续重试.

如果异常,调用channel.basicNack(), 消息处理失败, 则和单一消费者中的异常一样, 先给其他消费者否则无限重试.

转载地址:https://blog.csdn.net/liao0801_123/article/details/90039877 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:线程探讨
下一篇:springBoot打包- 部署- 发布到私服的问题

发表评论

最新留言

感谢大佬
[***.8.128.20]2024年03月27日 17时56分21秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

matlab 50hzquchu,新手求消除50HZ工频干扰陷波滤波器源程序 2019-04-21
laravel没有route.php,Laravel中的RouteCollection.php中的NotFoundHttpException 2019-04-21
php服务端开启socket,php socket服务端能不能在网页端开启?而不是只能用CLI模式开启... 2019-04-21
php不需要也能输出,php 如何只输出最后生成的那个值?? 2019-04-21
php正则过滤sql关键字,使用正则表达式屏蔽关键字的方法 2019-04-21
php取整v,php取整方式分享 2019-04-21
php写模糊搜索api接口,php通过sphinxapi接口实现全文搜索 2019-04-21
oracle安装出现2932,【案例】Oracle报错ORA-19815 fast_recovery_area无剩余空间解决办法... 2019-04-21
rac数据库下oracle打小补丁,Oracle 11g RAC 环境打PSU补丁的详细步骤 2019-04-21
form表单属性名相同java_form表单提交时候有多个相同name 的input如何处理? 2019-04-21
java图片加气泡文字_图片加气泡文字 2019-04-21
java总结i o流_14.java总结I/O流 2019-04-21
java和历转为西历_日期转西暦,和暦 2019-04-21
java 远程 yarn jar_再论Yarn Client和Yarn cluster 2019-04-21
java单元测试断言_单元测试+断言 2019-04-21
java 创建压缩包_用Java创建ZIP压缩文件 2019-04-21
java typedarray_TintTypedArray.java 2019-04-21
java字符字面量_java – 字符串字面量的行为是令人困惑的 2019-04-21
php判断数组的值是否为空,PHP判断数组是否为空的常用方法(五种方法) 2019-04-21
php 读数据库,PHP数据库 2019-04-21