
本文共 6732 字,大约阅读时间需要 22 分钟。
RocketMQ入门
消息队列(Message Queue)是一种应用程序间通信的高效方法,是分布式系统的重要组成部分。RocketMQ作为一款开源的消息中间件,因其高性能、高可靠性和强扩展性,在大数据、金融、电商等多个领域得到了广泛应用。接下来,我将为你详细介绍 RocketMQ 的核心特性、使用场景以及搭建和开发实践。
什么是消息队列
消息队列是一种实现应用程序间异步通信的技术,它允许系统组件之间以不紧确的方式交换消息。这对于处理高并发场景尤为重要。当需要将任务分散到多个系统中,或者需要在系统间解耦时,消息队列是一个理想的选择。
RocketMQ 的特点
RocketMQ 不仅支持多种消息协议(如 JMS 和 MQTT),还提供了灵活的消息模型和高效的消息传递机制。其主要特点包括:
为什么使用消息队列
消息队列在现代应用中发挥着越来越重要的作用。它可以帮助解决以下问题:
在 Linux 下安装 RocketMQ
安装和运行 RocketMQ 需要经过几个基本步骤。以下是详细的安装指南:
1. 安装必要工具
sudo yum -y install java-1.8 forgetting (安装 Java 8)
或者根据你的需求选择 Java 11 或更高版本。
- 安装 Maven
- 配置 Maven 环境变量
- 设置 Git
- ** Broker 启动**
- ** Namesrv 启动**
- Clones the monitoring repository:
- 编译并运行:
- 访问监控平台:
sudo yum -y install maven-3.8# 解压并将 Maven 移到 `/usr/local` 目录tar xvf apache-maven-3.8.0-bin.tar.gzmv apache-maven-3.8 /usr/local/apache-maven-3.8
修改 ~/.bashrc
文件:
export MAVEN_HOME=/usr/local/apache-maven-3.8export PATH=${PATH}:/usr/local/apache-maven-3.8/bin
sudo yum -y install git
2.下载 RocketMQ 源码
Clone RocketMQ 仓库并解压:
git clone https://github.com/apache/incubator-rocketmq.gitcd incubator-rocketmq# 解压源码(如果没有 unzip 工具,可以安装)sudo yum -y install unzipunzip rocketmq-all-4.3.0-source-release.zipmv rocketmq-all-4.3.0 /usr/local/rocketmq
3.编译 RocketMQ
在 RocketMQ 根目录下执行以下命令:
mvn -Prelease-all -DskipTests clean install -U
4.启动 RocketMQ
cd /usr/local/rocketmq/distribution/target/apache-rocketmq# 修改 Broker 配置sed -i "s/^JAVA_OPT='-Xms...*/JAVA_OPT='-Xms256m -Xmx256m -Xmn125m/" bin/runbroker.sh# 修改 Namesrv 配置sed -i "s/^JAVA_OPT='-Xms...*/JAVA_OPT='-Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m/' bin/runserver.sh# 后台启动service firewalld stopnohup sh bin/mqnamesrv & > ~/logs/rocketmqlogs/namesrv.log &
nohup sh bin/mqbroker -n 192.168.13.145:9876 autoCreateTopicEnable=true & > ~/logs/rocketmqlogs/broker.log &
5.测试 RocketMQ
发送消息:
export NAMESRV_ADDR=localhost:9876sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
RocketMQ 监控平台
为了更好地监控和管理 RocketMQ 集群,可以部署 RocketMQ 监控平台。
1.部署监控平台
git clone https://github.com/apache/rocketmq-externals.gitcd rocketmq-externals/rocketmq-console
mvn clean package -Dmaven.test.skip=truejava -jar target/rocketmq-console-1.0.0.jar
http://localhost:8080
Java 调用 RocketMQ
1. 配置防火墙
在虚拟机中启用防火墙可能会导致网络连接超时。建议关闭防火墙或添加 RocketMQ 的相关端口到防火墙允许列表。
2. 创建 Maven 项目
添加 RocketMQ 的依赖到项目 pom.xml:
org.apache.rocketmq rocketmq-client 4.3.0
生产者测试类
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); producer.setNamesrvAddr("NAMESRV_ADDR"); producer.start(); try { for (int i = 0; i < 10; i++) { Message msg = new Message("topic", "tag", "message" + i); SendResult sendResult = producer.send(msg); System.out.println(sendResult); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); }}
消费者测试类
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); consumer.setNamesrvAddr("NAMESRV_ADDR"); consumer.subscribe("topic", "*"); consumer.start(); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { for (MessageExt message : msgs) { System.out.println(Thread.currentThread().getName() + " 接收消息: " + new String(message.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); }}
Spring Boot 整合 RocketMQ
1. 依赖管理
在 pom.xml 中添加:
org.apache.rocketmq rocketmq-spring-boot-starter 2.1.0
2. 配置信息
在你的应用程序配置文件中添加:
rocketmq: name-server: 192.168.13.120:9876 producer: group: your-producer-group
消息类型详解
1. 同步消息
@RestControllerpublic class ProducerController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/send") public String sendMessage() { rocketMQTemplate.syncSend("topic", "同步消息"); return "success"; }}
2. 异步消息
@RestControllerpublic class ProducerController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/send") public String sendMessage() { rocketMQTemplate.asyncSend("topic", "异步消息", new SendCallback() { @Override public void onSuccess(SendResult sendResult) {} @Override public void onException(Throwable throwable) {} }); return "success"; }}
事务消息
事务消息适用于需要保证所有操作原子性、-consistent、隔离的场景。
1. 生产者代码
@RestControllerpublic class ProducerController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/send-transaction") public String sendMessage() throws MQClientException, InterruptedException { String transactionId = UUID.randomUUID().toString(); Message message = MessageBuilder.withPayload("事务消息") .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .build(); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transaction-topic", message, null); System.out.println("生产者发送事务消息成功:" + result); return "success"; }}
2. 消费者监听器
@Component@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "transaction-topic")public class ConsumerListener { @Override public void onMessage(String message) { System.out.println("消息接收成功:" + message); }}
消息过滤
消息过滤允许消费者只消费特定的消息,可以通过设置 Tag 或使用 SQL 过滤。
1. 设置 Tag
// 生产者producer.send(new Message("topic", "tag1", "过滤测试消息"));
2. 消费过滤
consumer.subscribe("topic", "tag1");
RocketMQ 集群
1. 主从复制
通过配置 Broker 的角色,实现消息的主从复制来提高可用性和容灾能力。
2. 读写分离
在集群环境下,读取消息和写入消息可以由不同的 Broker 处理,提高吞吐量和可靠性。
通过以上步骤,你已经掌握了 RocketMQ 的核心知识和操作流程。如果需要更深入的技术支持,可以参考 RocketMQ 的官方文档或社区。
发表评论
最新留言
关于作者
