尽管模式匹配,Esper Listener 中的 getUnderlying() 为空

问题描述 投票:0回答:1

我正在使用 Esper 8.9 在 Spring Boot 应用程序中进行复杂的事件处理。尽管正确配置了我的事件和模式,但我的 UpdateListener 实现中的 getUnderlying() 方法对于成功匹配我的模式的事件返回 null。这是我的设置的简化版本:

    @Bean
    public IntegrationFlow mqttEventInboundFlow(@Qualifier("mqttClientFactoryAasEvents") MqttPahoClientFactory factory) {
        ObjectMapper objectMapper = new ObjectMapper();

        return IntegrationFlow.from(mqttEventInboundAdapter(factory))
                .channel(mqttInputChannelEventInbound())
                .handle(message -> {

                    String jsonPayload = (String) message.getPayload();
                    EventWrapperDto payload;

                    try {
                        payload = objectMapper.readValue(jsonPayload, EventWrapperDto.class);
                        log.info("Sending event to Esper: {}", objectMapper.writeValueAsString(payload));
                        log.info("The pojo of the payload {}", payload);
                        epService.getEventService().sendEventBean(payload, "EventWrapperDto");

                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                    log.info("Received message");

                })
                .get();
    }
           statement.addListener((newData, oldData, stmt, rt) -> {
                log.info("Received new data: {}", newData.toString());
                log.info("Received old data: {}", oldData);
                if (newData != null) {
                    try {
                        List<EventWrapperDto> matchedEvents = Arrays.stream(newData)
                                .map(eventBean -> {

                                        log.info("Received eventBean: {}", eventBean + " toString: " + eventBean.toString());
                                        log.info("Underlying instance type: {}", eventBean.getUnderlying().getClass().getName());
                                        log.info("Underlying map contents: {}", eventBean.getUnderlying());
                                        EventWrapperDto eventWrapperDto = (EventWrapperDto) eventBean.getUnderlying();
                                        log.info("Converted to EventWrapperDto: {}", eventWrapperDto);

                                        return eventWrapperDto;
                                })
                                .toList();
                        log.info("Pattern found, combining events");
                    } catch (Exception e) {
                        log.error("Error processing events", e);
                    }

Esper的配置是这样的

@Configuration
public class EsperConfiguration {

    @Bean
    public EPRuntime epServiceProvider() {
        com.espertech.esper.common.client.configuration.Configuration config = new com.espertech.esper.common.client.configuration.Configuration();

        config.getCommon().addEventType("EventWrapperDto", EventWrapperDto.class.getName());
        return EPRuntimeProvider.getDefaultRuntime(config);
    }
}

日志是:

2024-02-22T17:59:56.080+02:00  INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound    : Sending event to Esper: {"event":{"source":{"timestamp":1708617595993,"value":"0","assetName":"IF","name":"OperationalData:temperature_of_the_bar","valueType":{"name":"STRING"},"unit":null,"@type":"Attribute"},"assetName":"IF","name":"OperationalData:temperature_of_the_bar","eventData":[],"timestamp":null,"@type":"ValueSetEventDTO"}}
2024-02-22T17:59:56.080+02:00  INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound    : The pojo of the payload EventWrapperDto(event=EventDto(type=ValueSetEventDTO, source=SourceDto(type=Attribute, timestamp=Thu Feb 22 17:59:55 EET 2024, value=0, assetName=IF, name=OperationalData:temperature_of_the_bar, valueType=ValueTypeDto(name=STRING), unit=null), assetName=IF, name=OperationalData:temperature_of_the_bar, eventData=[], timestamp=null))
2024-02-22T17:59:56.085+02:00  INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound    : Received message
2024-02-22T18:00:10.152+02:00  INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound    : Sending event to Esper: {"event":{"source":{"timestamp":1708617610147,"value":"bar","assetName":"IF","name":"OperationalData:product_id","valueType":{"name":"STRING"},"unit":null,"@type":"Attribute"},"assetName":"IF","name":"OperationalData:product_id","eventData":[],"timestamp":null,"@type":"ValueSetEventDTO"}}
2024-02-22T18:00:10.152+02:00  INFO 34396 --- [05-8be8cf13b987] c.e.i.a.port.in.mqtt.MqttEventInbound    : The pojo of the payload EventWrapperDto(event=EventDto(type=ValueSetEventDTO, source=SourceDto(type=Attribute, timestamp=Thu Feb 22 18:00:10 EET 2024, value=bar, assetName=IF, name=OperationalData:product_id, valueType=ValueTypeDto(name=STRING), unit=null), assetName=IF, name=OperationalData:product_id, eventData=[], timestamp=null))
2024-02-22T18:00:10.157+02:00  INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService   : Received new data: [Lcom.espertech.esper.common.client.EventBean;@60c3cec2
2024-02-22T18:00:10.158+02:00  INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService   : Received old data: {}
2024-02-22T18:00:10.158+02:00  INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService   : Received eventBean: MapEventBean eventType=com.espertech.esper.common.internal.event.map.MapEventType@66d8c66b toString: MapEventBean eventType=com.espertech.esper.common.internal.event.map.MapEventType@66d8c66b
2024-02-22T18:00:10.158+02:00  INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService   : Underlying instance type: java.util.HashMap
2024-02-22T18:00:10.158+02:00  INFO 34396 --- [05-8be8cf13b987] c.e.i.a.p.in.mqtt.service.EsperService   : Underlying map contents: {}

正如您所见,尽管我发送事件时 POJO 是正确的,但在侦听器中 getUnderlying 为 null。

java spring spring-boot complex-event-processing esper
1个回答
0
投票

您需要在帖子中提供 EPL,以便人们可以看到所选内容。我建议您在没有 MQTT 和 JSON 的情况下尝试一下,看看您会得到什么。输出是 Map 类型对象并且不包含某些映射键的值是正常的。

© www.soinside.com 2019 - 2024. All rights reserved.