RabbitMQ-Fanout案例
发布日期:2022-02-09 20:39:08 浏览次数:7 分类:技术文章

本文共 3219 字,大约阅读时间需要 10 分钟。

Consumer

package sc.app.stc.rmq.rabbitmq;import java.io.IOException;import java.util.List;import org.springframework.beans.BeanUtils;import com.alibaba.fastjson.JSONArray;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import sc.app.stc.po.Risk;import sc.app.stc.rmq.po.ExecutionReport;import sc.app.stc.rmq.po.RiskResponseBody;import sc.app.stc.rmq.stream.RiskStream;public class RiskConsumer {
public static void main(String[] args) {
consumer(); } /** * topic消费者 * * @Description: consumer * @author lq. * @date 2020年5月18日 上午10:42:58 * @version V1.0 */ public static void consumer() {
try {
Channel channel = RiskRpcServer.getChannelInstance("riskConsumer"); if(channel==null) {
System.out.println(" channel is null.."); return; } // 交换机声明 channel.exchangeDeclare(RiskRpcServer.queryRiskFanoutExchange, BuiltinExchangeType.FANOUT, RiskRpcServer.exchangeDurable, RiskRpcServer.exchangeAutoDelete, null); // 获取一个临时队列 String queueName = channel.queueDeclare().getQueue(); // 队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略) channel.queueBind(queueName, RiskRpcServer.queryRiskFanoutExchange, ""); // 这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body); String json = new String(body, "UTF-8"); System.out.println(String.format("RiskConsumer received:%s", json)); } }; // 声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体) channel.basicConsume(queueName, true, consumer); // 这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费 } catch (IOException e) {
System.out.println("Fanout is error.."); } }}

Product

package sc.app.stc.rmq.rabbitmq;import java.io.IOException;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;public class RiskProduct {
public static void main(String[] args) {
String message = ""; sendMessage(message,null); } /** * fanout生产者 * * @Description: product * @author lq. * @date 2020年1月3日 下午3:39:46 * @version V1.0 */ public static Channel sendMessage(String message, Channel channel) {
try {
if (channel == null) {
channel = RiskRpcServer.getChannelInstance("riskProduct"); } if(channel==null) {
System.out.println(" channel is null.."); return null; } // 声明交换机(参数为: 交换机名称; 交换机类型,广播模式) // 交换机声明 channel.exchangeDeclare(RiskRpcServer.queryRiskFanoutExchange, BuiltinExchangeType.FANOUT, RiskRpcServer.exchangeDurable, RiskRpcServer.exchangeAutoDelete, null); // 消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可) channel.basicPublish(RiskRpcServer.queryRiskFanoutExchange, "", null, message.getBytes()); System.out.println("********Message********:发送成功"); } catch (IOException e) {
System.out.println("FanoutProduct error"); } return channel; }}

转载地址:https://blog.csdn.net/java_lqjsw/article/details/107366780 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:CentOS7安装Anaconda和可视化界面
下一篇:RabbitMQ-RPC案例

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2024年03月24日 12时31分22秒