我读到的所有内容都说这应该可以工作。我需要我的监听器每10秒触发一次事件。我现在得到的是每一个事件,它都是一个监听器触发器。我错过了什么?基本的要求是每10秒创建一次汇总统计。理想情况下,我只是想把数据泵入运行时。所以,在这个例子中,我希望每10秒有一次10条记录的转储。
class StreamTest {
private final Configuration configuration = new Configuration();
private final EPRuntime runtime;
private final CompilerArguments args = new CompilerArguments();
private final EPCompiler compiler;
public DatadogApplicationTests() {
configuration.getCommon().addEventType(CommonLogEntry.class);
runtime = EPRuntimeProvider.getRuntime(this.getClass().getSimpleName(), configuration);
args.getPath().add(runtime.getRuntimePath());
compiler = EPCompilerProvider.getCompiler();
}
@Test
void testDisplayStatsEvery10S() throws Exception{
// Display stats every 10s about the traffic during those 10s:
EPCompiled compiled = compiler.compile("select * from CommonLogEntry.win:time(10)", args);
runtime.getDeploymentService().deploy(compiled).getStatements()[0].addListener(
(old, newEvents, epStatement, epRuntime) ->
Arrays.stream(old).forEach(e -> System.out.format("%s: received %n", LocalTime.now()))
);
new BufferedReader(new InputStreamReader(this.getClass().getResourceAsStream("/access.log"))).lines().map(CommonLogEntry::new).forEachOrdered(e -> {
runtime.getEventService().sendEventBean(e, e.getClass().getSimpleName());
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
} catch (InterruptedException ex) {
System.err.println(ex);
}
});
}
}
目前每秒钟输出一次,对应我的流中的睡眠。
11:00:54.676: received
11:00:55.684: received
11:00:56.689: received
11:00:57.694: received
11:00:58.698: received
11:00:59.700: received
时间窗口是一个滑动窗口。有一章是关于基本概念的,解释了它们是如何工作的。这里是基本概念章节的链接。
虽然不清楚需求是什么,但我认为你想实现的是收集一段时间的事件,然后发布它们。你可以从 解题模式.
这将收集事件10秒。
create schema StockTick(symbol string, price double);
create context CtxBatch start @now end after 10 seconds;
context CtxBatch select * from StockTick#keepall output snapshot when terminated;