等待两个PubSub数据流后再加入数据

问题描述 投票:0回答:1

我有两个独立的

PubSub
数据流。他们每个人都会收到一条通知。我想在出现通知时检索文件并使用
CoGroupByKey
加入这些文件。这两个文件都是 JSON 内容并且有一个共同的密钥。

在开始加入流程之前,我如何确定我已收到这两个文件?

with beam.Pipeline(options=pipeline_options) as p:
    file_1 = (
            p
            | "file_1: read file notifications" >> beam.io.ReadFromPubSub(subscription="sub1").with_output_types(bytes)
            | "file_1: WindowInto" >> beam.WindowInto(window.FixedWindows(60))
            | "file_1: read file" >> beam.io.ReadAllFromText()
        )
        
    file_2 = (
            p
            | "file_2: read file notifications" >> beam.io.ReadFromPubSub(subscription="sub2").with_output_types(bytes)
            | "file_2: WindowInto" >> beam.WindowInto(window.FixedWindows(60))
            | "file_2: read file" >> beam.io.ReadAllFromText()
        )
        
        # Flatten the collected PCollections
        joined_data = ({'file_1': file_1_collected, 'file_2': file_2_collected}
                       | "Flatten collected data" >> beam.Flatten())

        # Perform the join operation
        result = (joined_data
                  | "CoGroupByKey Join" >> beam.CoGroupByKey())

streaming apache-beam google-cloud-pubsub
1个回答
0
投票

对于该用例,我将使用状态和计时器。如果它们具有相同的键,则从每个流中生成元组(键,值),并在查看每个文件时填充一些状态变量。一旦您看到这两个文件,就可以执行其他操作并清除状态。您可以尝试使用

BagState
CombiningState
ValueState
计数,并使用计时器作为“后备”,以防您看不到这两个文件中的任何一个,这样您就不会永远保持状态。

在有状态 DoFn 中,您已经拥有这两个文件,因此无需使用

CoGroupByKey

在上面的代码片段中,如果生成的文件具有相似的时间戳,它们将落在同一个窗口中。但如果它们不是用相似的时间戳生成的,那么窗口将不起作用。这就是为什么我更喜欢基于状态和计时器的解决方案来解决该特定问题。

© www.soinside.com 2019 - 2024. All rights reserved.