我们使用 Apache Beam 向 API 发送一些数据。对于单个 API 调用,API 仅接受一定数量的元素。
为了将元素分组为微批次,我们使用了混合触发器,该触发器在以下条件之一发生时触发:1) 30 秒后 2) 当至少收集 10 个元素时。
现在看来,在某些情况下,触发器会为单个窗格传递超过 10 个元素。有什么建议可以强制记录的确切数量吗?我想,正如方法名称所示,可以传递 '.elementsCountAtLeast(int x)
more then
x` 元素。
// definition
public class WindowHelper {
public static Window<KV<String,String>> generateWIndow() {
Window <KV<String,String>> window = Window.<KV<String,String>>into(
new GlobalWindows()).triggering(
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
).discardingFiredPanes();
}
}
// invocation
data.apply(WindowHelper.generateWindow())
...
我会回想一下你为什么想要这个,在两个选项之间做出决定:
GroupIntoBatches
来批量获取小于或等于 10个元素。最后一批可能很小,冲完不到10个。
重要:
GroupIntoBatches
,批次的大小是输入/输出规范的一部分。在流式处理和批处理场景中,您都将获得 10 个或更少元素的批次。