RocketMQ入门
发布日期:2021-05-26 05:13:29 浏览次数:20 分类:精选文章

本文共 6732 字,大约阅读时间需要 22 分钟。

RocketMQ入门

消息队列(Message Queue)是一种应用程序间通信的高效方法,是分布式系统的重要组成部分。RocketMQ作为一款开源的消息中间件,因其高性能、高可靠性和强扩展性,在大数据、金融、电商等多个领域得到了广泛应用。接下来,我将为你详细介绍 RocketMQ 的核心特性、使用场景以及搭建和开发实践。


什么是消息队列

消息队列是一种实现应用程序间异步通信的技术,它允许系统组件之间以不紧确的方式交换消息。这对于处理高并发场景尤为重要。当需要将任务分散到多个系统中,或者需要在系统间解耦时,消息队列是一个理想的选择。


RocketMQ 的特点

RocketMQ 不仅支持多种消息协议(如 JMS 和 MQTT),还提供了灵活的消息模型和高效的消息传递机制。其主要特点包括:

  • 可靠性:RocketMQ 围绕至少一次消息传递(至少一次 delivery guarantee)的设计,确保消息不会丢失或重复消费。
  • 高性能:支持大规模的消息堆积和快速的消息生产消费。
  • 灵活性:支持多种消息模式,包括发布/订阅(Pub/Sub)和点对点(P2P)模式。
  • 分布式架构:支持集群部署,具备高可用性和高可扩展性。
  • 开源与兼容性:RocketMQ 提供丰富的文档和社区支持,兼容多种开发框架如 Java、Python、Go 等。

  • 为什么使用消息队列

    消息队列在现代应用中发挥着越来越重要的作用。它可以帮助解决以下问题:

  • 解除耦合:通过消息队列,系统间可以解除了直接调用接口的依赖,避免因接口失败导致整个流程崩溃。
  • 异步处理:消息队列允许多个系统异步处理消息,减少处理延迟。
  • 削峰填谷:在流量高峰期,消息队列可以缓解系统压力,防止系统过载。
  • 消息驱动开发:系统可以通过消息队列暴露接口,实现松耦合的消息生产者和消费者架构。

  • 在 Linux 下安装 RocketMQ

    安装和运行 RocketMQ 需要经过几个基本步骤。以下是详细的安装指南:

    1. 安装必要工具

  • 安装 Java 开发工具
  • sudo yum -y install java-1.8 forgetting (安装 Java 8)

    或者根据你的需求选择 Java 11 或更高版本。

    1. 安装 Maven
    2. sudo yum -y install maven-3.8# 解压并将 Maven 移到 `/usr/local` 目录tar xvf apache-maven-3.8.0-bin.tar.gzmv apache-maven-3.8 /usr/local/apache-maven-3.8
      1. 配置 Maven 环境变量
      2. 修改 ~/.bashrc 文件:

        export MAVEN_HOME=/usr/local/apache-maven-3.8export PATH=${PATH}:/usr/local/apache-maven-3.8/bin
        1. 设置 Git
        2. sudo yum -y install git

          2.下载 RocketMQ 源码

          Clone RocketMQ 仓库并解压:

          git clone https://github.com/apache/incubator-rocketmq.gitcd incubator-rocketmq# 解压源码(如果没有 unzip 工具,可以安装)sudo yum -y install unzipunzip rocketmq-all-4.3.0-source-release.zipmv rocketmq-all-4.3.0 /usr/local/rocketmq

          3.编译 RocketMQ

          在 RocketMQ 根目录下执行以下命令:

          mvn -Prelease-all -DskipTests clean install -U

          4.启动 RocketMQ

        3. ** Broker 启动**
        4. cd /usr/local/rocketmq/distribution/target/apache-rocketmq# 修改 Broker 配置sed -i "s/^JAVA_OPT='-Xms...*/JAVA_OPT='-Xms256m -Xmx256m -Xmn125m/" bin/runbroker.sh# 修改 Namesrv 配置sed -i "s/^JAVA_OPT='-Xms...*/JAVA_OPT='-Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m/' bin/runserver.sh# 后台启动service firewalld stopnohup sh bin/mqnamesrv & > ~/logs/rocketmqlogs/namesrv.log &
          1. ** Namesrv 启动**
          2. nohup sh bin/mqbroker -n 192.168.13.145:9876 autoCreateTopicEnable=true & > ~/logs/rocketmqlogs/broker.log &

            5.测试 RocketMQ

            发送消息:

            export NAMESRV_ADDR=localhost:9876sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

            接收消息:

            sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

            RocketMQ 监控平台

            为了更好地监控和管理 RocketMQ 集群,可以部署 RocketMQ 监控平台。

            1.部署监控平台

          3. Clones the monitoring repository:
          4. git clone https://github.com/apache/rocketmq-externals.gitcd rocketmq-externals/rocketmq-console
            1. 编译并运行:
            2. mvn clean package -Dmaven.test.skip=truejava -jar target/rocketmq-console-1.0.0.jar
              1. 访问监控平台:
              2. http://localhost:8080

                Java 调用 RocketMQ

                1. 配置防火墙

                在虚拟机中启用防火墙可能会导致网络连接超时。建议关闭防火墙或添加 RocketMQ 的相关端口到防火墙允许列表。

                2. 创建 Maven 项目

                添加 RocketMQ 的依赖到项目 pom.xml:

                org.apache.rocketmq
                rocketmq-client
                4.3.0

                生产者测试类

                public class Producer {    public static void main(String[] args) throws MQClientException, InterruptedException {        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");        producer.setNamesrvAddr("NAMESRV_ADDR");        producer.start();        try {            for (int i = 0; i < 10; i++) {                Message msg = new Message("topic", "tag", "message" + i);                SendResult sendResult = producer.send(msg);                System.out.println(sendResult);                Thread.sleep(1000);            }        } catch (Exception e) {            e.printStackTrace();        }        producer.shutdown();    }}

                消费者测试类

                public class Consumer {    public static void main(String[] args) throws InterruptedException, MQClientException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");        consumer.setNamesrvAddr("NAMESRV_ADDR");        consumer.subscribe("topic", "*");        consumer.start();        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List
                msgs, ConsumeConcurrentlyContext context) { for (MessageExt message : msgs) { System.out.println(Thread.currentThread().getName() + " 接收消息: " + new String(message.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); }}

                Spring Boot 整合 RocketMQ

                1. 依赖管理

                在 pom.xml 中添加:

                org.apache.rocketmq
                rocketmq-spring-boot-starter
                2.1.0

                2. 配置信息

                在你的应用程序配置文件中添加:

                rocketmq:  name-server: 192.168.13.120:9876  producer:    group: your-producer-group

                消息类型详解

                1. 同步消息

                @RestControllerpublic class ProducerController {    @Autowired    private RocketMQTemplate rocketMQTemplate;    @GetMapping("/send")    public String sendMessage() {        rocketMQTemplate.syncSend("topic", "同步消息");        return "success";    }}

                2. 异步消息

                @RestControllerpublic class ProducerController {    @Autowired    private RocketMQTemplate rocketMQTemplate;    @GetMapping("/send")    public String sendMessage() {        rocketMQTemplate.asyncSend("topic", "异步消息", new SendCallback() {            @Override            public void onSuccess(SendResult sendResult) {}            @Override            public void onException(Throwable throwable) {}        });        return "success";    }}

                事务消息

                事务消息适用于需要保证所有操作原子性、-consistent、隔离的场景。

                1. 生产者代码

                @RestControllerpublic class ProducerController {    @Autowired    private RocketMQTemplate rocketMQTemplate;    @GetMapping("/send-transaction")    public String sendMessage() throws MQClientException, InterruptedException {        String transactionId = UUID.randomUUID().toString();        Message message = MessageBuilder.withPayload("事务消息")                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)                .build();        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transaction-topic", message, null);        System.out.println("生产者发送事务消息成功:" + result);        return "success";    }}

                2. 消费者监听器

                @Component@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "transaction-topic")public class ConsumerListener {    @Override    public void onMessage(String message) {        System.out.println("消息接收成功:" + message);    }}

                消息过滤

                消息过滤允许消费者只消费特定的消息,可以通过设置 Tag 或使用 SQL 过滤。

                1. 设置 Tag

                // 生产者producer.send(new Message("topic", "tag1", "过滤测试消息"));

                2. 消费过滤

                consumer.subscribe("topic", "tag1");

                RocketMQ 集群

                1. 主从复制

                通过配置 Broker 的角色,实现消息的主从复制来提高可用性和容灾能力。

                2. 读写分离

                在集群环境下,读取消息和写入消息可以由不同的 Broker 处理,提高吞吐量和可靠性。


                通过以上步骤,你已经掌握了 RocketMQ 的核心知识和操作流程。如果需要更深入的技术支持,可以参考 RocketMQ 的官方文档或社区。

    上一篇:springcloud
    下一篇:Java与C++的区别

    发表评论

    最新留言

    做的很好,不错不错
    [***.243.131.199]2025年04月26日 18时48分48秒

    关于作者

        喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
    -- 愿君每日到此一游!

    推荐文章

    CentOS 系列:CentOS 7 使用 virt-install + vnc 图形界面/非图形界面 创建虚拟机 2023-01-23
    CentOS 系列:CentOS 7文件系统的组成 2023-01-23
    CentOS系列:【Linux】CentOS7操作系统安装nginx实战(多种方法,超详细) 2023-01-23
    CSDN----Markdown编辑器 2023-01-23
    Docker容器进入的4种方式(推荐最后一种) 2023-01-23
    Docker部署postgresql-11以及主从配置 2023-01-23
    EnvironmentNotWritableError: The current user does not have write permissions to the target environm 2023-01-23
    Golang起步篇(Windows、Linux、mac三种系统安装配置go环境以及IDE推荐以及入门语法详细释义) 2023-01-23
    Hyper-V系列:windows11开启系统自带安卓虚拟机并安装apk包 2023-01-23
    Hyper-V系列:微软官方文章 2023-01-23
    idea打war包的两种方式 2023-01-23
    Java系列:【注释模板】IDEA中JAVA类、方法注释模板教程 2023-01-23
    JS系列(仅供参考):【浏览器编程】浏览器F12调试工具面板详解和JavaScript添加断点 2023-01-23
    Kali 更换源(超详细,附国内优质镜像源地址) 2023-01-23
    kali安装docker(亲测有效) 2023-01-23
    Linux系列:Linux目录分析:[/] + [/usr] + [/usr/local] + [/usr/local/app-name]、Linux最全环境配置 + 动态库/静态库配置 2023-01-23
    Linux系列:ubuntu各版本之间的区别以及Ubuntu、kubuntu、xUbuntu、lubuntu等版本区别及界面样式 2023-01-23
    mysql系列:远程连接MySQL错误“plugin caching_sha2_password could not be loaded”的解决办法 2023-01-23
    Nessus扫描结果出现在TE.IO或者ES容器结果查看问题解决方案 2023-01-23
    Nmap渗透测试指南之探索网络 2023-01-23