springboot+mqtt初体验
发布日期:2021-05-27 02:54:18 浏览次数:41 分类:精选文章

本文共 3407 字,大约阅读时间需要 11 分钟。

Spring Boot集成MQTT简要教程

系统依赖项说明

本文基于以下依赖库进行实现:

``` org.springframework.integration ```

注意:版本号的选择需根据项目具体需求进行确定,确保各依赖间不存在版本冲突。

核心配置说明

1 消耗 Hagun

spring.mqtt.username= admin 
spring.mqtt.password= public
spring.mqtt.url= tcp://127.0.0.1:1883
spring.mqtt.client.id= clientId-001
spring.mqtt.consumer.clientId= consumerClientId-001
spring.mqtt.consumer.defaultTopic= topic1

2 消息协调器定义

本教程中主要使用了以下两种核心组件:

  • 发布/订阅通道

    • 通道名称mqttOutboundChannel(发布通道)和 mqttInboundChannel(订阅通道)
    • 类型:采用直接通道类型 DirectChannel
  • 消息处理逻辑

  • 3 消息生产相关

    @Configuration
    @IntegrationComponentScan
    public class MqttConfig {
    private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);
    // ... 其他属性及常量定义(稍后详细说明)
    }

    消息发布

    1.1. 发送消息

    public interface MqttMessageProducer {
    void sendToMqtt(String data);
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    }

    1.2. 实现类

    @Component
    public class MqttMessageConsumer implements MessageHandler {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public void handleMessage(Message message) throws MessagingException {
    String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
    String payload = String.valueOf(message.getPayload());
    logger.info("接收到 mqtt 消息,主题: {}, 消息: {}", topic, payload);
    }
    }

    消息订阅

    1.1 订阅配置

    @Bean
    public MessageProducer inbound() {
    // 非异步方式实现
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
    consumerClientId,
    mqttClientFactory(),
    consumerDefaultTopic
    );
    // 设置监听完成超时
    adapter.setCompletionTimeout(5000);
    // 配置消息转换器
    adapter.setConverter(new DefaultPahoMessageConverter());
    // 设置 QoS 水平
    adapter.setQos(1);
    // 将接收的消息发送到入站通道
    adapter.setOutputChannel(mqttInboundChannel());
    return adapter;
    }

    1.2 消息处理

    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
    public MessageHandler handler() {
    return new MessageHandler() {
    @Override
    public void handleMessage(Message message) throws MessagingException {
    String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
    String msg = message.getPayload().toString();
    logger.info("接收到订阅消息:\ntopic: %s\nmessage: %s", topic, msg);
    }
    };
    }

    使用示例

    1. 发布消息

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    @Component
    public interface MqttMessageProducer {
    void sendToMqtt(String data);
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    }

    2. 消息接收验证

    可以通过启动应用程序并在终端发布消息来验证订阅是否正常工作:

    curl -X POST http://localhost:8080/mqtt/send -H "Content-Type: application/json" -d '{"topic":"topic1","payload":"111"}'

    执行上述命令后,查看日志输出,应该可以看到类似以下内容:

    接收到订阅消息:
    主题: topic1
    消息: 111

    附录

  • 常见问题解答

    • Q: 为什么不能直接在代码中使用 @ServiceActivator?

      A: @ServiceActivator 用于在 MessageHandler 中标注消息处理器,确保其作为 Spring Integration 的消息监听器被正确识别和处理。

  • 扩展注意事项

    • Q: 如何处理消息传输中的重试机制?

      A: 在 MQTT 协议中,消息重试机制主要依赖于 QoS 等级的设置。建议在生产环境中按照实际需求配置 QoS 水平,例如 QoS=1 可以实现消息重试但减少重复率。

    • Q: 如何处理消息丢失问题?

      A: 对于 QoS=0 的消息,可以采用 lost+ 模式(客户端有无getter),而对于 QoS=1 的消息,可以设置合理的重试策略,以减少消息丢失情况。

    本教程为初步理解和实现 Spring Boot 集成 MQTT 提供了一个简洁的参考示例,实际应用中建议根据具体场景进行适当的配置和优化。

    上一篇:springboot+netty初体验
    下一篇:win10环境安装EMQX过程

    发表评论

    最新留言

    初次前来,多多关照!
    [***.217.46.12]2025年04月30日 05时00分36秒

    关于作者

        喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
    -- 愿君每日到此一游!

    推荐文章