RabbitMQ-Fanout案例
发布日期:2022-02-09 20:39:08
浏览次数:7
分类:技术文章
本文共 3219 字,大约阅读时间需要 10 分钟。
Consumer
package sc.app.stc.rmq.rabbitmq;import java.io.IOException;import java.util.List;import org.springframework.beans.BeanUtils;import com.alibaba.fastjson.JSONArray;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import sc.app.stc.po.Risk;import sc.app.stc.rmq.po.ExecutionReport;import sc.app.stc.rmq.po.RiskResponseBody;import sc.app.stc.rmq.stream.RiskStream;public class RiskConsumer { public static void main(String[] args) { consumer(); } /** * topic消费者 * * @Description: consumer * @author lq. * @date 2020年5月18日 上午10:42:58 * @version V1.0 */ public static void consumer() { try { Channel channel = RiskRpcServer.getChannelInstance("riskConsumer"); if(channel==null) { System.out.println(" channel is null.."); return; } // 交换机声明 channel.exchangeDeclare(RiskRpcServer.queryRiskFanoutExchange, BuiltinExchangeType.FANOUT, RiskRpcServer.exchangeDurable, RiskRpcServer.exchangeAutoDelete, null); // 获取一个临时队列 String queueName = channel.queueDeclare().getQueue(); // 队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略) channel.queueBind(queueName, RiskRpcServer.queryRiskFanoutExchange, ""); // 这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String json = new String(body, "UTF-8"); System.out.println(String.format("RiskConsumer received:%s", json)); } }; // 声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体) channel.basicConsume(queueName, true, consumer); // 这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费 } catch (IOException e) { System.out.println("Fanout is error.."); } }}
Product
package sc.app.stc.rmq.rabbitmq;import java.io.IOException;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;public class RiskProduct { public static void main(String[] args) { String message = ""; sendMessage(message,null); } /** * fanout生产者 * * @Description: product * @author lq. * @date 2020年1月3日 下午3:39:46 * @version V1.0 */ public static Channel sendMessage(String message, Channel channel) { try { if (channel == null) { channel = RiskRpcServer.getChannelInstance("riskProduct"); } if(channel==null) { System.out.println(" channel is null.."); return null; } // 声明交换机(参数为: 交换机名称; 交换机类型,广播模式) // 交换机声明 channel.exchangeDeclare(RiskRpcServer.queryRiskFanoutExchange, BuiltinExchangeType.FANOUT, RiskRpcServer.exchangeDurable, RiskRpcServer.exchangeAutoDelete, null); // 消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可) channel.basicPublish(RiskRpcServer.queryRiskFanoutExchange, "", null, message.getBytes()); System.out.println("********Message********:发送成功"); } catch (IOException e) { System.out.println("FanoutProduct error"); } return channel; }}
转载地址:https://blog.csdn.net/java_lqjsw/article/details/107366780 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
做的很好,不错不错
[***.243.131.199]2024年03月24日 12时31分22秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
【Leetcode刷题篇】leetcode204 计数质数
2019-04-26
【Leetcode刷题篇】leetcode70 爬楼梯
2019-04-26
【Leetcode刷题篇】leetcode739 每日温度
2019-04-26
【Leetcode刷题篇】leetcode121买卖股票的最佳时机
2019-04-26
【面试篇】Java多线程并发-Java关键字volatile详解
2019-04-26
【面试篇】Java的代理模式-静态代理和动态代理详解
2019-04-26
【面试篇】 Java对象拷贝(对象克隆 对象复制)
2019-04-26
【Leetcode刷题篇】leetcode64 最小路径和
2019-04-26
【Leetcode刷题篇】leetcode79 单词搜索
2019-04-26
【Leetcode刷题篇】leetcode300 最长上升子序列
2019-04-26
【Leetcode刷题篇】leetcode394 字符串解码
2019-04-26
【Leetcode刷题篇】leetcode152 乘积最大数组
2019-04-26
【Leetcode刷题篇】leetcode56 合并区间
2019-04-26
【Leetcode刷题篇】leetcode210 课程表II
2019-04-26
【Leetcode刷题篇】leetcode207 课程表
2019-04-26
【Leetcode刷题篇】leetcode322 零钱兑换
2019-04-26
【Leetcode刷题篇】leetcode437 路径总和III
2019-04-26
【Leetcode刷题篇】leetcode416 分割等和子集
2019-04-26
【Leetcode刷题篇】leetcode31 下一个排列
2019-04-26
【Leetcode刷题篇】leetcode621 任务调度器
2019-04-26