我正在尝试使用Kafka Streams实现窗口,其中我们从主题获得了一堆JSON格式的不同事务。对于每个事务ID,可以有多个提交(根据SQL提交考虑它,因此针对不同的事务ID进行多个事务)。我需要设置一个微批处理窗口,其中我将每5秒消耗一次数据,并根据事务ID将数据存储在目录中。
我想知道这里的方法以及应该使用哪个库/函数/类?
您可以通过各种方式在kafka流中使用Windowing。翻滚窗口的一个例子(如你的情况)
builder.stream("events")
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
.count()
.suppress(untilWindowCloses(BufferConfig.unbounded()))
.toStream()
.process(FileSink::new);
您可以在此处找到更多详细信息: https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#tumbling-time-windows