我正在尝试思考如何构建一些数据管道需求,我只是想知道以下是否可能:
我知道我可以通过其他方式做到这一点。例如,我可以创建 2 个侦听同一订阅的数据流作业/管道。 (或者让 2 个单独的订阅监听同一主题会更好吗?)我还可以为数据流作业创建一个订阅,然后创建另一个订阅(同一主题),立即推送到 BigQuery。
但是,如果我可以拥有一组代码(因此需要监控一个 CI/CD 作业)来完成这两项任务,那么它会简化我们需要维护的内容,并且这将是更可取的。
这可能吗?
是的,这是可能的。要在不使用窗口的情况下在 BigQuery 中写入,您需要 使用
method
等于 STREAMING_INSERTS 或 STORAGE_WRITE_API(可能将 use_at_least_once
设置为 True
以获得更低的延迟,但存在重复的风险)。
对于管道的其余部分,您只需要有两个分支。像这样的东西:
msgs = p | "Read from P/S" >> beam.io.ReadFromPubsub(...)
dictionaries = msgs | "Transform to dict with some schema" >> ...
dictionaries | beam.io.gcp.bigquery.WriteToBigQuery(...,method=STORAGE_WRITE_API, use_at_least_once=True,...)
dictionaries | beam.WindowInto(FixedWindows(size=300)) | ... # (aggregate, write to another BigQuery table, etc)