我有一个用例,可以在以下条件下从 PubSub 在 GCS 存储桶中创建文件:
例如, i) 如果我们指定窗口大小为15分钟,并且在15分钟窗口大小之前发布到PubSub的消息超过500MB,那么我们可以将数据加载到GCS桶中 ii) 如果即使在窗口大小 15 分钟后发布到 PubSub 的消息也没有超过 500MB,那么在这种情况下,由于达到了窗口大小,它应该开始将数据加载到 GCS Bucket。
任何人都可以帮我提供上述用例的代码参考吗?
我尝试创建一个函数来获取发布到 PubSub 的消息的大小,但无法获取大小
将 @XQ Hu 推荐发布为 wiki,以便为社区提供更好的可见性,为此用例使用复合触发器:
您可能需要复合触发器:
https://beam.apache.org/documentation/programming-guide/#composite-triggers
触发器类型:
您可以通过
和AfterWatermark.pastEndOfWindow
向.withEarlyFirings
添加额外的提前触发或延迟触发。.withLateFirings
指定永远执行的触发器。每当满足触发器的条件时,它都会导致窗口发出结果,然后重置并重新开始。将Repeatedly.forever
与Repeatedly.forever
组合起来指定导致重复触发器停止的条件非常有用。.orFinally
组合多个触发器以按特定顺序触发。每当序列中的触发器发出窗口时,序列就会前进到下一个触发器。AfterEach.inOrder
接受多个触发器,并在第一次满足其任何参数触发器时发出。这相当于多个触发器的逻辑或运算。AfterFirst
接受多个触发器,并在满足其所有参数触发器时发出。这相当于多个触发器的逻辑与运算。AfterAll
可以作为最终条件,导致任何触发器最后一次触发并且不再触发。orFinally