我正在使用 Esper 8.9 在 Spring Boot 应用程序中进行复杂的事件处理。尽管正确配置了我的事件和模式,但我的 UpdateListener 实现中的 getUnderlying() 方法对于成功匹配我的模式的事件返回 null。这是我的设置的简化版本:
@Bean
public IntegrationFlow mqttEventInboundFlow(@Qualifier("mqttClientFactoryAasEvents") MqttPahoClientFactory factory) {
ObjectMapper objectMapper = new ObjectMapper();
return IntegrationFlow.from(mqttEventInboundAdapter(factory))
.channel(mqttInputChannelEventInbound())
.handle(message -> {
String jsonPayload = (String) message.getPayload();
EventWrapperDto payload;
try {
payload = objectMapper.readValue(jsonPayload, EventWrapperDto.class);
log.info("Sending event to Esper: {}", objectMapper.writeValueAsString(payload));
log.info("The pojo of the payload {}", payload);
epService.getEventService().sendEventBean(payload, "EventWrapperDto");
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
log.info("Received message");
})
.get();
}
statement.addListener((newData, oldData, stmt, rt) -> {
log.info("Received new data: {}", newData.toString());
log.info("Received old data: {}", oldData);
if (newData != null) {
try {
List<EventWrapperDto> matchedEvents = Arrays.stream(newData)
.map(eventBean -> {
log.info("Received eventBean: {}", eventBean + " toString: " + eventBean.toString());
log.info("Underlying instance type: {}", eventBean.getUnderlying().getClass().getName());
log.info("Underlying map contents: {}", eventBean.getUnderlying());
EventWrapperDto eventWrapperDto = (EventWrapperDto) eventBean.getUnderlying();
log.info("Converted to EventWrapperDto: {}", eventWrapperDto);
return eventWrapperDto;
})
.toList();
log.info("Pattern found, combining events");
} catch (Exception e) {
log.error("Error processing events", e);
}
Esper的配置是这样的
@Configuration
public class EsperConfiguration {
@Bean
public EPRuntime epServiceProvider() {
com.espertech.esper.common.client.configuration.Configuration config = new com.espertech.esper.common.client.configuration.Configuration();
config.getCommon().addEventType("EventWrapperDto", EventWrapperDto.class.getName());
return EPRuntimeProvider.getDefaultRuntime(config);
}
}
日志是:
2024-02-22T17:59:56.080+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound : Sending event to Esper: {"event":{"source":{"timestamp":1708617595993,"value":"0","assetName":"IF","name":"OperationalData:temperature_of_the_bar","valueType":{"name":"STRING"},"unit":null,"@type":"Attribute"},"assetName":"IF","name":"OperationalData:temperature_of_the_bar","eventData":[],"timestamp":null,"@type":"ValueSetEventDTO"}}
2024-02-22T17:59:56.080+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound : The pojo of the payload EventWrapperDto(event=EventDto(type=ValueSetEventDTO, source=SourceDto(type=Attribute, timestamp=Thu Feb 22 17:59:55 EET 2024, value=0, assetName=IF, name=OperationalData:temperature_of_the_bar, valueType=ValueTypeDto(name=STRING), unit=null), assetName=IF, name=OperationalData:temperature_of_the_bar, eventData=[], timestamp=null))
2024-02-22T17:59:56.085+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound : Received message
2024-02-22T18:00:10.152+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound : Sending event to Esper: {"event":{"source":{"timestamp":1708617610147,"value":"bar","assetName":"IF","name":"OperationalData:product_id","valueType":{"name":"STRING"},"unit":null,"@type":"Attribute"},"assetName":"IF","name":"OperationalData:product_id","eventData":[],"timestamp":null,"@type":"ValueSetEventDTO"}}
2024-02-22T18:00:10.152+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound : The pojo of the payload EventWrapperDto(event=EventDto(type=ValueSetEventDTO, source=SourceDto(type=Attribute, timestamp=Thu Feb 22 18:00:10 EET 2024, value=bar, assetName=IF, name=OperationalData:product_id, valueType=ValueTypeDto(name=STRING), unit=null), assetName=IF, name=OperationalData:product_id, eventData=[], timestamp=null))
2024-02-22T18:00:10.157+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService : Received new data: [Lcom.espertech.esper.common.client.EventBean;@60c3cec2
2024-02-22T18:00:10.158+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService : Received old data: {}
2024-02-22T18:00:10.158+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService : Received eventBean: MapEventBean eventType=com.espertech.esper.common.internal.event.map.MapEventType@66d8c66b toString: MapEventBean eventType=com.espertech.esper.common.internal.event.map.MapEventType@66d8c66b
2024-02-22T18:00:10.158+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService : Underlying instance type: java.util.HashMap
2024-02-22T18:00:10.158+02:00 INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService : Underlying map contents: {}
正如您所见,尽管我发送事件时 POJO 是正确的,但在侦听器中 getUnderlying 为 null。
您需要在帖子中提供 EPL,以便人们可以看到所选内容。我建议您在没有 MQTT 和 JSON 的情况下尝试一下,看看您会得到什么。输出是 Map 类型对象并且不包含某些映射键的值是正常的。