GCP PubSub 到 GCS 数据流

问题描述 投票:0回答:1

我有一个用例,可以在以下条件下从 PubSub 在 GCS 存储桶中创建文件:

  1. 窗口大小限制
  2. 发布到 PubSub 主题的消息大小超过 500MB

例如, i) 如果我们指定窗口大小为15分钟,并且在15分钟窗口大小之前发布到PubSub的消息超过500MB,那么我们可以将数据加载到GCS桶中 ii) 如果即使在窗口大小 15 分钟后发布到 PubSub 的消息也没有超过 500MB,那么在这种情况下,由于达到了窗口大小,它应该开始将数据加载到 GCS Bucket。

任何人都可以帮我提供上述用例的代码参考吗?

我尝试创建一个函数来获取发布到 PubSub 的消息的大小,但无法获取大小

google-cloud-platform google-cloud-dataflow apache-beam google-cloud-pubsub gcs
1个回答
0
投票

将 @XQ Hu 推荐发布为 wiki,以便为社区提供更好的可见性,为此用例使用复合触发器:

您可能需要复合触发器:

https://beam.apache.org/documentation/programming-guide/#composite-triggers

触发器类型:

  • 您可以通过

    AfterWatermark.pastEndOfWindow
    .withEarlyFirings
    .withLateFirings
    添加额外的提前触发或延迟触发。

  • Repeatedly.forever
    指定永远执行的触发器。每当满足触发器的条件时,它都会导致窗口发出结果,然后重置并重新开始。将
    Repeatedly.forever
    .orFinally
    组合起来指定导致重复触发器停止的条件非常有用。

  • AfterEach.inOrder
    组合多个触发器以按特定顺序触发。每当序列中的触发器发出窗口时,序列就会前进到下一个触发器。

  • AfterFirst
    接受多个触发器,并在第一次满足其任何参数触发器时发出。这相当于多个触发器的逻辑或运算。

  • AfterAll
    接受多个触发器,并在满足其所有参数触发器时发出。这相当于多个触发器的逻辑与运算。

  • orFinally
    可以作为最终条件,导致任何触发器最后一次触发并且不再触发。

© www.soinside.com 2019 - 2024. All rights reserved.