RabbitMQ-RPC案例
发布日期:2022-02-09 20:39:08 浏览次数:6 分类:技术文章

本文共 6641 字,大约阅读时间需要 22 分钟。

RpcClient

package sc.app.stc.rmq.rabbitmq;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class RiskRpcClient {
public static void main(String[] argv) throws Exception {
RiskRpcClient.client(null); } /** * rpc客户端 * * @Description: client * @author lq. * @date 2020年1月3日 下午3:40:04 * @version V1.0 */ public static Channel client(Channel channel) {
try {
// 此方法封装了如何连接RabbitMQ和创建connection,channel.源码见附录 if (channel == null) {
channel = RiskRpcServer.getChannelInstance("riskClient"); } if (channel == null) {
System.out.println(" channel is null.."); return null; } /** * 创建Exchange Exchange类型 direct , fanout, topic, topic */ channel.exchangeDeclare(RiskRpcServer.queryRiskExchange, BuiltinExchangeType.DIRECT, RiskRpcServer.exchangeDurable, RiskRpcServer.queueAutoDelete, null); // 此处注意:声明了要回复的队列。队列名称由RabbitMQ自动创建。 // 这样做的好处是:每个客户端有属于自己的唯一回复队列,生命周期同客户端 String replyQueue = channel.queueDeclare().getQueue(); System.out.println(String.format("replyQueue:%s", replyQueue)); // 绑定回复队列 channel.queueBind(replyQueue, RiskRpcServer.queryRiskExchange, replyQueue); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); // 指定回复队列和回复correlateId builder.replyTo(replyQueue); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(RiskRpcServer.queryRiskExchange, RiskRpcServer.queryRiskRoutingKey, properties, ("{}").getBytes()); DefaultConsumer consumer = new DefaultConsumer(channel) {
// 这是一个回调函数,客户端获取消息,就调用此方法,处理消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String json = new String(body, "utf-8"); System.out.println(json); } }; channel.basicConsume(replyQueue, true, consumer); } catch (Exception e) {
// TODO: handle exception } return channel; }}

RpcServer

package sc.app.stc.rmq.rabbitmq;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.Recoverable;import com.rabbitmq.client.RecoveryListener;import com.rabbitmq.client.ShutdownListener;import com.rabbitmq.client.ShutdownSignalException;public class RiskRpcServer {
public static String queryRiskExchange = "indicators.direct"; public static String queryRiskFanoutExchange = "indicator.fanout"; public static String queryRiskRoutingKey = "indicator.info"; // exchange/queue的属性 public static boolean exchangeDurable = false;// 持久化 // 当所有绑定队列都不在使用时,是否自动删除交换器 true:删除false:不删除 public static boolean exchangeAutoDelete = false; public static boolean queueDurable = true;// 持久化 // 当所有消费客户端连接断开后,是否自动删除队列 true:删除false:不删除 public static boolean queueAutoDelete = false; static String json = ""; public static void main(String[] argv) throws Exception {
RiskRpcServer.server(); } /** * rpc服务端 * * @Description: server * @throws IOException * @author lq. * @date 2020年1月3日 下午3:40:18 * @version V1.0 */ public static void server() {
try {
final Channel channel = RiskRpcServer.getChannelInstance("riskServer"); if (channel == null) {
System.out.println(" channel is null.."); return; } /** * 创建Exchange Exchange类型 direct , fanout, topic, topic */ channel.exchangeDeclare(queryRiskExchange, BuiltinExchangeType.DIRECT, exchangeDurable, queueAutoDelete, null); // 获取一个临时队列 String queueName = channel.queueDeclare().getQueue(); // 队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略) channel.queueBind(queueName, queryRiskExchange, queryRiskRoutingKey); DefaultConsumer consumer = new DefaultConsumer(channel) {
// 这是一个回到函数,服务器端获取到消息,就会调用此方法处理消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); // 我们在将要回复的消息属性中,放入从客户端传递过来的correlateId builder.correlationId(properties.getCorrelationId()); AMQP.BasicProperties prop = builder.build(); // 发送给回复队列的消息,exchange="",routingKey=回复队列名称 // 因为RabbitMQ对于队列,始终存在一个默认exchange="",routingKey=队列名称的绑定关系 channel.basicPublish(queryRiskExchange, properties.getReplyTo(), prop, (new String(json)).getBytes()); } }; // 回调 channel.basicConsume(queueName, true, consumer); } catch (IOException e) {
System.out.println("RPCServer error"); } } /** * AMQP的连接其实是对Socket做的封装, 注意以下AMQP协议的版本号,不同版本的协议用法可能不同。 * * @param ConnectionDescription * @return * @Description: getChannelInstance * @author lq. * @date 2020年1月2日 下午4:27:15 * @version V1.0 */ public static Channel getChannelInstance(String ConnectionDescription) {
Channel channel = null; try {
System.out.println(String.format("create Channel :%s", ConnectionDescription)); ConnectionFactory connectionFactory = getConnectionFactory(); Connection connection = connectionFactory.newConnection(ConnectionDescription); channel = connection.createChannel(); ((Recoverable) connection).addRecoveryListener(new RecoveryListener() {
@Override public void handleRecovery(Recoverable recoverable) {
// 重连成功后执行 } @Override public void handleRecoveryStarted(Recoverable recoverable) {
// 重连 } }); connection.addShutdownListener(new ShutdownListener() {
// 断开连接 @Override public void shutdownCompleted(ShutdownSignalException cause) {
} }); } catch (Exception e) {
// 连接失败 System.out.println("获取Channel连接失败"); } return channel; } public static ConnectionFactory getConnectionFactory() {
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("vhost"); connectionFactory.setHost("host"); connectionFactory.setPort("port"); connectionFactory.setUsername("username"); connectionFactory.setPassword("passwprd"); connectionFactory.setConnectionTimeout(5000); // 错误恢复机制 connectionFactory.setAutomaticRecoveryEnabled(true); // 建议5-20,客户端值应小于服务端,单位 秒 默认60 // connectionFactory.setRequestedHeartbeat(10); connectionFactory.setTopologyRecoveryEnabled(true); return connectionFactory; }}

转载地址:https://blog.csdn.net/java_lqjsw/article/details/107366647 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:RabbitMQ-Fanout案例
下一篇:分布式集群系统下的高可用session解决方案

发表评论

最新留言

网站不错 人气很旺了 加油
[***.192.178.218]2024年04月18日 05时46分53秒