RabbitMQ入门
发布日期:2021-05-09 09:33:55 浏览次数:20 分类:博客文章

本文共 14403 字,大约阅读时间需要 48 分钟。

 

安装与环境配置

本文为个人记录RabbitMq的学习笔记。

由于RabbitMQ是基于erlang的,所以,在正式安装RabbitMQ之前,需要先安装一下erlang。

下载安装RabbitMq

1、2两个步骤的安装包,我是在本机Windows10安装的。如果你看到这里可以百度找一下安装博客,其实就是傻瓜式下一步 

链接:https://pan.baidu.com/s/1mUyOdBKcvoW3Lv8f4y8NAQ

提取码:41fm

配置erlang和RabbitMq的环境变量

 

 

在cmd命令界面安装RabbitMq网页版控制台 rabbitmq-plugins enable rabbitmq_management

浏览器 http://localhost:15672/ 进入登录页面,账号密码都是 guest

SpringBoot整合使用

注意:当前我把生产者和消费者放到了两个项目中,是因为刚好搭建了springcloud项目,一步到位。你也可以当到同一个项目中。

需要先引入依赖,如果是多个项目则都需要引入

org.springframework.boot
spring-boot-starter-amqp
View Code

不使用交换机

package com.dang.springcloud.mq;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class Send1 {    //创建一个队列    @Bean(value = "mq1")    public Queue mq1() {        return new Queue("mq1");    }    @Autowired    private AmqpTemplate amqpTemplate;    @GetMapping("g1")    public String send() {        //发送消息,将消息放入mq1的队列中        amqpTemplate.convertAndSend("mq1", "我是一个字符串");        System.out.println("1已发送消息");        return "发送成功!";    }}
View Code
package com.datang.springcloud.mq;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class Receive1 {    //    监听 mq1消息队列    @RabbitListener(queues = "mq1")    public void receive(String msg) {        System.out.println("1我接受到的消息是:" + msg);    }}
View Code

这种写法,发送者直接将消息投递到队列中,消费者从队列获取。思考一个问题,如果我们需要将一条消息投递到多个队列是不是这么写?

package com.dang.springcloud.mq;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class Send2 {    //创建一个队列    @Bean(value = "mq2")    public Queue mq2() {        return new Queue("mq2");    }    //创建一个队列    @Bean(value = "mq3")    public Queue mq3() {        return new Queue("mq3");    }    @Autowired    private AmqpTemplate amqpTemplate;    @GetMapping("g2")    public String send() {        //发送消息,将消息放入mq1的队列中        amqpTemplate.convertAndSend("mq2", "我是一个字符串");        amqpTemplate.convertAndSend("mq3", "我是一个字符串");        System.out.println("23已发送消息");        return "发送成功!";    }}
View Code 
package com.datang.springcloud.mq;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class Receive2 {    //    监听 mq2消息队列    @RabbitListener(queues = "mq2")    public void receive(String msg) {        System.out.println("2我接受到的消息是:" + msg);    }    //    监听 mq3消息队列    @RabbitListener(queues = "mq3")    public void receive2(String msg) {        System.out.println("3我接受到的消息是:" + msg);    }}
View Code

RabbitMQ在消息发送者和队列之间在抽象一个概念,交换机。消息发送者,不关心消息到底发送给谁,而是将消息投递给交换机,并且指定路由地址,交换机通过路由标记决定消息投递到哪个队列中。

使用DirectExchange交换机

package com.dang.springcloud.mq;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@Configuration@RestControllerpublic class Send2 {    //创建一个队列    @Bean(value = "mq2")    public Queue mq2() {        return new Queue("mq2");    }    //创建一个队列    @Bean(value = "mq3")    public Queue mq3() {        return new Queue("mq3");    }    //Direct交换机    @Bean(value = "ex1")    DirectExchange ex1() {        return new DirectExchange("ex1");    }    //绑定 将队列和交换机绑定, 并设置用于匹配键    @Bean(value = "bi1")    Binding binding1() {        return BindingBuilder.bind(mq2()).to(ex1()).with("k1");    }    //绑定 将队列和交换机绑定, 并设置用于匹配键    @Bean(value = "bi2")    Binding binding2() {        return BindingBuilder.bind(mq3()).to(ex1()).with("k1");    }    @Autowired    private AmqpTemplate amqpTemplate;    @GetMapping("g2")    public String send() {        amqpTemplate.convertAndSend("ex1", "k1", "我是一条消息");        System.out.println("消息已投递给交换机ex1");        return "发送成功!";    }}
View Code
package com.datang.springcloud.mq;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class Receive2 {    //    监听 mq2消息队列    @RabbitListener(queues = "mq2")    public void receive(String msg) {        System.out.println("2我接受到的消息是:" + msg);    }    //    监听 mq3消息队列    @RabbitListener(queues = "mq3")    public void receive2(String msg) {        System.out.println("3我接受到的消息是:" + msg);    }}
View Code

这种写法,我们还是需要创建多个队列,然后创建交换机,将队列和交换机绑定,此时我们需要填入一个路由地址。消息发送者直接将消息投递给交换机,并且指定路由。由交换机根据路由查询绑定的队列。上边代码片段,只有唯一的一个交换机,但是从这个交换机中查到了两个不同的路由,将消息投递给绑定的队列中。消费者则不改变写法。

交换机和路由不会因为我们的代码删除而删除!

 

 

 

使用TopicExchange交换机

一下代码片段和上边的十分类似,只是使用了TopicExchange交换机。第二个绑定器使用的是 * 通配符。消息是可以被投递到mq5的。

package com.dang.springcloud.mq;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@Configuration@RestControllerpublic class Send3 {    //创建一个队列    @Bean(value = "mq4")    public Queue mq4() {        return new Queue("mq4");    }    //创建一个队列    @Bean(value = "mq5")    public Queue mq5() {        return new Queue("mq5");    }    //Topic交换机    @Bean(value = "ex2")    TopicExchange ex2() {        return new TopicExchange("ex2");    }    //绑定 将队列和交换机绑定, 并设置用于匹配键    @Bean(value = "bi3")    Binding binding1() {        return BindingBuilder.bind(mq4()).to(ex2()).with("student.age.12");    }    //绑定 将队列和交换机绑定, 并设置用于匹配键    @Bean(value = "bi4")    Binding binding2() {        return BindingBuilder.bind(mq5()).to(ex2()).with("student.age.*");    }    @Autowired    private AmqpTemplate amqpTemplate;    @GetMapping("g3")    public String send() {        amqpTemplate.convertAndSend("ex2", "student.age.12", "我是一条消息");        System.out.println("消息已投递给交换机ex2");        return "发送成功!";    }}
View Code
package com.datang.springcloud.mq;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class Receive3 {    //    监听 mq4消息队列    @RabbitListener(queues = "mq4")    public void receive(String msg) {        System.out.println("4我接受到的消息是:" + msg);    }    //    监听 mq5消息队列    @RabbitListener(queues = "mq5")    public void receive2(String msg) {        System.out.println("5我接受到的消息是:" + msg);    }}
View Code

 

接下来我们删除 student.age.* 的路由,换成 student.* 这样消息就不能转发到mq5了。可见 * 只能匹配一个词。

删除掉所有绑定到mq5队列的路由,重新绑定。下面代码片段使用的 # 通配符,# 匹配多个词。

package com.dang.springcloud.mq;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@Configuration@RestControllerpublic class Send3 {    //创建一个队列    @Bean(value = "mq4")    public Queue mq4() {        return new Queue("mq4");    }    //创建一个队列    @Bean(value = "mq5")    public Queue mq5() {        return new Queue("mq5");    }    //Topic交换机    @Bean(value = "ex2")    TopicExchange ex2() {        return new TopicExchange("ex2");    }    //绑定 将队列和交换机绑定, 并设置用于匹配键    @Bean(value = "bi3")    Binding binding1() {        return BindingBuilder.bind(mq4()).to(ex2()).with("student.age.12");    }    //绑定 将队列和交换机绑定, 并设置用于匹配键    @Bean(value = "bi4")    Binding binding2() {        return BindingBuilder.bind(mq5()).to(ex2()).with("student.#");    }    @Autowired    private AmqpTemplate amqpTemplate;    @GetMapping("g3")    public String send() {        amqpTemplate.convertAndSend("ex2", "student.age.12", "我是一条消息");        System.out.println("消息已投递给交换机ex2");        return "发送成功!";    }}
View Code

使用FanoutExchange交换机

FanoutExchange没有路由的概念,到像是点对点的直接发送。

package com.dang.springcloud.mq;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@Configuration@RestControllerpublic class Send4 {    //创建一个队列    @Bean(value = "mq6")    public Queue mq6() {        return new Queue("mq6");    }    //创建一个队列    @Bean(value = "mq7")    public Queue mq7() {        return new Queue("mq7");    }    //Fanout交换机    @Bean(value = "ex3")    FanoutExchange ex3() {        return new FanoutExchange("ex3");    }    //绑定 将队列和交换机绑定    @Bean(value = "bi5")    Binding binding1() {        return BindingBuilder.bind(mq6()).to(ex3());    }    //绑定 将队列和交换机绑定    @Bean(value = "bi6")    Binding binding2() {        return BindingBuilder.bind(mq7()).to(ex3());    }    @Autowired    private AmqpTemplate amqpTemplate;    @GetMapping("g4")    public String send() {        //此处第二个路由参数必须给 null,否则不能成功将消息投递到队列        amqpTemplate.convertAndSend("ex3", null,"我是一条消息");        System.out.println("消息已投递给交换机ex3");        return "发送成功!";    }}
View Code
package com.datang.springcloud.mq;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class Receive4 {    //    监听 mq6消息队列    @RabbitListener(queues = "mq6")    public void receive(String msg) {        System.out.println("6我接受到的消息是:" + msg);    }    //    监听 mq7消息队列    @RabbitListener(queues = "mq7")    public void receive2(String msg) {        System.out.println("7我接受到的消息是:" + msg);    }}
View Code

生产者回调

在Spring配置文件中配置如下

spring.rabbitmq.publisher-confirms=truespring.rabbitmq.publisher-returns=true
View Code

注意看回调函数的写法

package com.dang.springcloud.mq;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestController@Configurationpublic class Send5 {    @Bean    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {        RabbitTemplate rabbitTemplate = new RabbitTemplate();        rabbitTemplate.setConnectionFactory(connectionFactory);        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数        rabbitTemplate.setMandatory(true);        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {            @Override            public void confirm(CorrelationData correlationData, boolean b, String s) {                System.out.println("confirm-----"+correlationData);                System.out.println("confirm-----"+b);                System.out.println("confirm-----"+s);            }        });        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {            @Override            public void returnedMessage(Message message, int i, String s, String s1, String s2) {                System.out.println("returnedMessage-----"+message);                System.out.println("returnedMessage-----"+s);                System.out.println("returnedMessage-----"+s1);                System.out.println("returnedMessage-----"+s2);            }        });        return rabbitTemplate;    }    //创建一个队列    @Bean(value = "mq8")    public Queue mq8() {        return new Queue("mq8");    }    //Topic交换机    @Bean(value = "ex4")    TopicExchange ex4() {        return new TopicExchange("ex4");    }    //绑定 将队列和交换机绑定, 并设置用于匹配键    @Bean(value = "bi7")    Binding binding1() {        return BindingBuilder.bind(mq8()).to(ex4()).with("HaHaHa");    }    @Autowired    private AmqpTemplate rabbitTemplate;    @GetMapping(value = "g5")    public String g5() {        rabbitTemplate.convertAndSend("ex4", "HaHaHa", "我是一条消息");        System.out.println("消息已投递给交换机ex4");        return "发送成功";    }}
View Code

回调结果

交换机不对confirm-----nullconfirm-----falseconfirm-----channel error; protocol method: #method
(reply-code=404, reply-text=NOT_FOUND - no exchange 'ex' in vhost '/', class-id=60, method-id=40)找不到路由returnedMessage-----(Body:'我是一条消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])returnedMessage-----NO_ROUTEreturnedMessage-----ex4returnedMessage-----HaHaHawwconfirm-----nullconfirm-----trueconfirm-----null成功confirm-----nullconfirm-----trueconfirm-----null
View Code

消费者重回队列

package com.datang.springcloud.mq;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Componentpublic class Receive5 {    //    监听 mq8消息队列    @RabbitListener(queues = "mq8")    public void receive(String msg, Message message, Channel channel) throws IOException {        try {            System.out.println("8我接受到的消息是:" + msg);            int a = 1 / 0;            // 第一个参数为队列的ID,第二个参数批处理,手动提交比当前ID小的。如果这个队列为正确的,那就可以把之前的提交            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);            System.out.println("消费消息确认" + message.getMessageProperties().getConsumerQueue() + ",接收到了回调方法");        } catch (Exception e) {            //其实重发也没多大意义,一般都是做个日志,或者其他补偿。            System.out.println("尝试重发:" + message.getMessageProperties().getConsumerQueue());            //前两个参数和 basicAck 一样,最后一个为是否重回队列            channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);        }    }}
View Code

 

 

 

 

 

 

 

上一篇:shardingJDBC分库分表
下一篇:spring-mvc

发表评论

最新留言

很好
[***.229.124.182]2025年04月12日 05时57分26秒