Esper EPL 窗口选择不能用于基本的例子。

问题描述 投票:0回答:1

我读到的所有内容都说这应该可以工作。我需要我的监听器每10秒触发一次事件。我现在得到的是每一个事件,它都是一个监听器触发器。我错过了什么?基本的要求是每10秒创建一次汇总统计。理想情况下,我只是想把数据泵入运行时。所以,在这个例子中,我希望每10秒有一次10条记录的转储。

class StreamTest {

    private final Configuration     configuration = new Configuration();
    private final EPRuntime         runtime;
    private final CompilerArguments args          = new CompilerArguments();
    private final EPCompiler        compiler;

    public DatadogApplicationTests() {
        configuration.getCommon().addEventType(CommonLogEntry.class);
        runtime = EPRuntimeProvider.getRuntime(this.getClass().getSimpleName(), configuration);
        args.getPath().add(runtime.getRuntimePath());
        compiler = EPCompilerProvider.getCompiler();
    }
    @Test
    void testDisplayStatsEvery10S() throws Exception{
        //        Display stats every 10s about the traffic during those 10s:

        EPCompiled       compiled         = compiler.compile("select * from CommonLogEntry.win:time(10)", args);

        runtime.getDeploymentService().deploy(compiled).getStatements()[0].addListener(
                (old, newEvents, epStatement, epRuntime) -> 
                 Arrays.stream(old).forEach(e -> System.out.format("%s: received %n", LocalTime.now()))

        );

        new BufferedReader(new InputStreamReader(this.getClass().getResourceAsStream("/access.log"))).lines().map(CommonLogEntry::new).forEachOrdered(e -> {
            runtime.getEventService().sendEventBean(e, e.getClass().getSimpleName());
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(1));
            } catch (InterruptedException ex) {
                System.err.println(ex);
            }
        });

    }
}

目前每秒钟输出一次,对应我的流中的睡眠。

11:00:54.676: received 
11:00:55.684: received 
11:00:56.689: received 
11:00:57.694: received 
11:00:58.698: received 
11:00:59.700: received 
complex-event-processing esper
1个回答
0
投票

时间窗口是一个滑动窗口。有一章是关于基本概念的,解释了它们是如何工作的。这里是基本概念章节的链接。

虽然不清楚需求是什么,但我认为你想实现的是收集一段时间的事件,然后发布它们。你可以从 解题模式.

这将收集事件10秒。

create schema StockTick(symbol string, price double);
create context CtxBatch start @now end after 10 seconds;
context CtxBatch select * from StockTick#keepall output snapshot when terminated;
© www.soinside.com 2019 - 2024. All rights reserved.