Apache Beam - 无界 PCollection 中的计数消息(每个窗口)

问题描述 投票:0回答:0

我需要一个简单的任务来计算来自无限数据源的固定窗口中的消息数。

步骤是:

  1. 从 Pub/sub 读取数据
  2. 定义窗口固定时间
  3. 创建一个(键,值),其中键是窗口时间戳
  4. 计算每个键的消息

作为数据源,我使用公共发布/订阅主题

projects/pubsub-public-data/topics/taxirides-realtime

topic_name = "projects/pubsub-public-data/topics/taxirides-realtime"

options = pipeline_options.PipelineOptions()
options.view_as(pipeline_options.StandardOptions).streaming = True
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

ib.options.recording_duration = '18s'

class dataAsKey(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
         yield (format(window.start.to_utc_datetime()) , 1)

p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options)

data = (p 
    | "Read" >> beam.io.ReadFromPubSub(topic=topic_name) 
    | 'Window' >> beam.WindowInto(beam.window.FixedWindows(6))
)
ib.show(data)

如上图所示,Interactive Beams 收到了 110 条消息。 但是,当我需要使用 Window Timestamp 作为键转换 PCollection 然后计算键的消息数时,消息数与聚合后的总数不匹配。在下面的示例中,每个键的消息总数为 50.

count = data(
    | 'Data as key' >> beam.ParDo(dataAsKey())
    | 'Count per Window' >> Count.PerKey()

ib.show(count)

奇怪的是,当我将相同的代码与有界数据源一起使用时,值匹配。

python apache-beam
© www.soinside.com 2019 - 2024. All rights reserved.