
springboot+mqtt初体验
发布日期:2021-05-27 02:54:18
浏览次数:41
分类:精选文章
本文共 3407 字,大约阅读时间需要 11 分钟。
Spring Boot集成MQTT简要教程
系统依赖项说明
本文基于以下依赖库进行实现:
``` org.springframework.integration ```
注意:版本号的选择需根据项目具体需求进行确定,确保各依赖间不存在版本冲突。
核心配置说明
1 消耗 Hagun
spring.mqtt.username= admin spring.mqtt.password= public spring.mqtt.url= tcp://127.0.0.1:1883 spring.mqtt.client.id= clientId-001 spring.mqtt.consumer.clientId= consumerClientId-001 spring.mqtt.consumer.defaultTopic= topic1
2 消息协调器定义
本教程中主要使用了以下两种核心组件:
发布/订阅通道
- 通道名称:
mqttOutboundChannel
(发布通道)和mqttInboundChannel
(订阅通道) - 类型:采用直接通道类型
DirectChannel
消息处理逻辑
3 消息生产相关
@Configuration@IntegrationComponentScanpublic class MqttConfig { private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class); // ... 其他属性及常量定义(稍后详细说明)}
消息发布
1.1. 发送消息
public interface MqttMessageProducer { void sendToMqtt(String data); void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
1.2. 实现类
@Componentpublic class MqttMessageConsumer implements MessageHandler { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void handleMessage(Message message) throws MessagingException { String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); String payload = String.valueOf(message.getPayload()); logger.info("接收到 mqtt 消息,主题: {}, 消息: {}", topic, payload); }}
消息订阅
1.1 订阅配置
@Beanpublic MessageProducer inbound() { // 非异步方式实现 MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( consumerClientId, mqttClientFactory(), consumerDefaultTopic ); // 设置监听完成超时 adapter.setCompletionTimeout(5000); // 配置消息转换器 adapter.setConverter(new DefaultPahoMessageConverter()); // 设置 QoS 水平 adapter.setQos(1); // 将接收的消息发送到入站通道 adapter.setOutputChannel(mqttInboundChannel()); return adapter;}
1.2 消息处理
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message message) throws MessagingException { String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); String msg = message.getPayload().toString(); logger.info("接收到订阅消息:\ntopic: %s\nmessage: %s", topic, msg); } };}
使用示例
1. 发布消息
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")@Componentpublic interface MqttMessageProducer { void sendToMqtt(String data); void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
2. 消息接收验证
可以通过启动应用程序并在终端发布消息来验证订阅是否正常工作:
curl -X POST http://localhost:8080/mqtt/send -H "Content-Type: application/json" -d '{"topic":"topic1","payload":"111"}'
执行上述命令后,查看日志输出,应该可以看到类似以下内容:
接收到订阅消息:主题: topic1消息: 111
附录
常见问题解答
-
Q: 为什么不能直接在代码中使用
@ServiceActivator
?A:
@ServiceActivator
用于在MessageHandler
中标注消息处理器,确保其作为 Spring Integration 的消息监听器被正确识别和处理。
扩展注意事项
-
Q: 如何处理消息传输中的重试机制?
A: 在 MQTT 协议中,消息重试机制主要依赖于
QoS
等级的设置。建议在生产环境中按照实际需求配置QoS
水平,例如QoS=1
可以实现消息重试但减少重复率。
-
Q: 如何处理消息丢失问题?
A: 对于
QoS=0
的消息,可以采用lost+
模式(客户端有无getter),而对于QoS=1
的消息,可以设置合理的重试策略,以减少消息丢失情况。
本教程为初步理解和实现 Spring Boot 集成 MQTT 提供了一个简洁的参考示例,实际应用中建议根据具体场景进行适当的配置和优化。
发表评论
最新留言
初次前来,多多关照!
[***.217.46.12]2025年04月30日 05时00分36秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
ECSHOP实现收货国家省市由选择下拉菜单改为手动
2025-03-29
Educational Codeforces Round 28
2025-03-29
ed编辑器--适用于shell脚本内编辑文件的最最简单编辑器
2025-03-29
EF 资料
2025-03-29
Effective Modern C++:02auto
2025-03-29
efficientnet最合适的尺寸和最后一层的层数
2025-03-29
Ehcache Java开源缓存框架
2025-03-29
el-select下拉框修改背景色
2025-03-29
Elasticsearch & Kibana & Filebeat开启SSL通信
2025-03-29
ElasticSearch - DSL查询文档语法,以及深度分页问题、解决方案
2025-03-29
ElasticSearch - 基于 JavaRestClient 操作索引库和文档
2025-03-29
ElasticSearch - 索引库和文档相关命令操作
2025-03-29
elasticsearch 7.7.0 单节点配置x-pack
2025-03-29
ElasticSearch 中 REST API 详解
2025-03-29