我正在尝试使用 Spring Integration 通过 MQTT 消息发送 JSON 作为有效负载。这是我的出站处理程序:
@Bean
@ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
public MessageHandler mqttOutbound(final ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager,
final MqttClientPersistence persistence,
final MqttHeaderMapper mqttHeaderMapper,
final JacksonJsonMessageConverter jacksonJsonMessageConverter) {
final var messageHandler = new Mqttv5PahoMessageHandler(clientManager);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setPersistence(persistence);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(false);
final var defaultPahoMessageConverter = new DefaultPahoMessageConverter(MqttQoS.AT_LEAST_ONCE.value(), false, StandardCharsets.UTF_8.name());
final var bytesMessageMapper = new ConvertingBytesMessageMapper(jacksonJsonMessageConverter);
defaultPahoMessageConverter.setBytesMessageMapper(bytesMessageMapper);
messageHandler.setConverter(defaultPahoMessageConverter);
return messageHandler;
}
但是,这最终会在
org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler#onInit
on 上断言
else {
Assert.state(!(getConverter() instanceof MqttMessageConverter),
"MessageConverter must not be an MqttMessageConverter");
}
我还能怎样做?
我尝试研究 Spring Integration 示例源代码,但我没有从中收集到任何内容。
是的。我们只是没有足够的资源来涵盖我们通过示例为社区实现的所有功能。所以,那里确实没有 MQTT v5 示例。
如您所见,
Mqttv5PahoMessageHandler
不能与 MqttMessageConverter
一起提供。相反,必须使用 MessageConverter
的常规实例。在您的情况下,必须直接使用提供的 jacksonJsonMessageConverter
:
messageHandler.setConverter(jacksonJsonMessageConverter);
在文档中查看更多信息:https://docs.spring.io/spring-integration/reference/mqtt.html#mqtt-v5