RabbitMQ-RPC案例
发布日期:2022-02-09 20:39:08
浏览次数:6
分类:技术文章
本文共 6641 字,大约阅读时间需要 22 分钟。
RpcClient
package sc.app.stc.rmq.rabbitmq;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class RiskRpcClient { public static void main(String[] argv) throws Exception { RiskRpcClient.client(null); } /** * rpc客户端 * * @Description: client * @author lq. * @date 2020年1月3日 下午3:40:04 * @version V1.0 */ public static Channel client(Channel channel) { try { // 此方法封装了如何连接RabbitMQ和创建connection,channel.源码见附录 if (channel == null) { channel = RiskRpcServer.getChannelInstance("riskClient"); } if (channel == null) { System.out.println(" channel is null.."); return null; } /** * 创建Exchange Exchange类型 direct , fanout, topic, topic */ channel.exchangeDeclare(RiskRpcServer.queryRiskExchange, BuiltinExchangeType.DIRECT, RiskRpcServer.exchangeDurable, RiskRpcServer.queueAutoDelete, null); // 此处注意:声明了要回复的队列。队列名称由RabbitMQ自动创建。 // 这样做的好处是:每个客户端有属于自己的唯一回复队列,生命周期同客户端 String replyQueue = channel.queueDeclare().getQueue(); System.out.println(String.format("replyQueue:%s", replyQueue)); // 绑定回复队列 channel.queueBind(replyQueue, RiskRpcServer.queryRiskExchange, replyQueue); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); // 指定回复队列和回复correlateId builder.replyTo(replyQueue); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(RiskRpcServer.queryRiskExchange, RiskRpcServer.queryRiskRoutingKey, properties, ("{}").getBytes()); DefaultConsumer consumer = new DefaultConsumer(channel) { // 这是一个回调函数,客户端获取消息,就调用此方法,处理消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String json = new String(body, "utf-8"); System.out.println(json); } }; channel.basicConsume(replyQueue, true, consumer); } catch (Exception e) { // TODO: handle exception } return channel; }}
RpcServer
package sc.app.stc.rmq.rabbitmq;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.Recoverable;import com.rabbitmq.client.RecoveryListener;import com.rabbitmq.client.ShutdownListener;import com.rabbitmq.client.ShutdownSignalException;public class RiskRpcServer { public static String queryRiskExchange = "indicators.direct"; public static String queryRiskFanoutExchange = "indicator.fanout"; public static String queryRiskRoutingKey = "indicator.info"; // exchange/queue的属性 public static boolean exchangeDurable = false;// 持久化 // 当所有绑定队列都不在使用时,是否自动删除交换器 true:删除false:不删除 public static boolean exchangeAutoDelete = false; public static boolean queueDurable = true;// 持久化 // 当所有消费客户端连接断开后,是否自动删除队列 true:删除false:不删除 public static boolean queueAutoDelete = false; static String json = ""; public static void main(String[] argv) throws Exception { RiskRpcServer.server(); } /** * rpc服务端 * * @Description: server * @throws IOException * @author lq. * @date 2020年1月3日 下午3:40:18 * @version V1.0 */ public static void server() { try { final Channel channel = RiskRpcServer.getChannelInstance("riskServer"); if (channel == null) { System.out.println(" channel is null.."); return; } /** * 创建Exchange Exchange类型 direct , fanout, topic, topic */ channel.exchangeDeclare(queryRiskExchange, BuiltinExchangeType.DIRECT, exchangeDurable, queueAutoDelete, null); // 获取一个临时队列 String queueName = channel.queueDeclare().getQueue(); // 队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略) channel.queueBind(queueName, queryRiskExchange, queryRiskRoutingKey); DefaultConsumer consumer = new DefaultConsumer(channel) { // 这是一个回到函数,服务器端获取到消息,就会调用此方法处理消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); // 我们在将要回复的消息属性中,放入从客户端传递过来的correlateId builder.correlationId(properties.getCorrelationId()); AMQP.BasicProperties prop = builder.build(); // 发送给回复队列的消息,exchange="",routingKey=回复队列名称 // 因为RabbitMQ对于队列,始终存在一个默认exchange="",routingKey=队列名称的绑定关系 channel.basicPublish(queryRiskExchange, properties.getReplyTo(), prop, (new String(json)).getBytes()); } }; // 回调 channel.basicConsume(queueName, true, consumer); } catch (IOException e) { System.out.println("RPCServer error"); } } /** * AMQP的连接其实是对Socket做的封装, 注意以下AMQP协议的版本号,不同版本的协议用法可能不同。 * * @param ConnectionDescription * @return * @Description: getChannelInstance * @author lq. * @date 2020年1月2日 下午4:27:15 * @version V1.0 */ public static Channel getChannelInstance(String ConnectionDescription) { Channel channel = null; try { System.out.println(String.format("create Channel :%s", ConnectionDescription)); ConnectionFactory connectionFactory = getConnectionFactory(); Connection connection = connectionFactory.newConnection(ConnectionDescription); channel = connection.createChannel(); ((Recoverable) connection).addRecoveryListener(new RecoveryListener() { @Override public void handleRecovery(Recoverable recoverable) { // 重连成功后执行 } @Override public void handleRecoveryStarted(Recoverable recoverable) { // 重连 } }); connection.addShutdownListener(new ShutdownListener() { // 断开连接 @Override public void shutdownCompleted(ShutdownSignalException cause) { } }); } catch (Exception e) { // 连接失败 System.out.println("获取Channel连接失败"); } return channel; } public static ConnectionFactory getConnectionFactory() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("vhost"); connectionFactory.setHost("host"); connectionFactory.setPort("port"); connectionFactory.setUsername("username"); connectionFactory.setPassword("passwprd"); connectionFactory.setConnectionTimeout(5000); // 错误恢复机制 connectionFactory.setAutomaticRecoveryEnabled(true); // 建议5-20,客户端值应小于服务端,单位 秒 默认60 // connectionFactory.setRequestedHeartbeat(10); connectionFactory.setTopologyRecoveryEnabled(true); return connectionFactory; }}
转载地址:https://blog.csdn.net/java_lqjsw/article/details/107366647 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
网站不错 人气很旺了 加油
[***.192.178.218]2024年04月18日 05时46分53秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Python 爬虫面试题 170 道:2019 版
2021-06-29
歪门邪道
2021-06-29
【家务】盘点小孩玩具零件缺失情况
2021-06-29
开发中文 API 的一些策略
2021-06-29
从日本编程书籍《我的第一本编程书》中译版看中文例程如何扬长避短——标识符(一)
2021-06-29
中文命名标识符如何区分类型和变量
2021-06-29
编程术语成系统中文化的意义
2021-06-29
草蟒 Python 中文 API 与 IDE 支持尝鲜
2021-06-29
一种改进中文 API 可读性的方法:参数不限于在末尾
2021-06-29
中文代码之Django官方入门:建立模型
2021-06-29
Python实现推流直播
2021-06-29
你不得不了解的卷积神经网络发展史
2021-06-29
你不得不了解的机器学习知识
2021-06-29
你不得不了解的深度学习知识(一)
2021-06-29
你不得不了解的深度学习知识(二)
2021-06-29
AI算法之Encoder-Decoder 和 Seq2Seq
2021-06-29
AI算法之Attention机制
2021-06-29
人体口罩佩戴检测实战
2021-06-29
[实战]200类鸟类细粒度图像分类
2021-06-29
【实战】英文垃圾短信分类
2021-06-29