如何在 Apache Beam 管道中记录传入消息

问题描述 投票:0回答:2

我正在编写一个简单的 apache beam 流管道,从 pubsub 主题获取输入并将其存储到 bigquery 中。几个小时以来,我以为我什至无法阅读消息,因为我只是试图将输入记录到控制台:

events = p | 'Read PubSub' >> ReadFromPubSub(subscription=SUBSCRIPTION)
logging.info(events)

当我将其写入文本时,效果很好!然而我对

logger
的呼叫从未发生过。

人们如何开发/调试这些流管道?

我尝试添加以下行:

events | 'Log' >> logging.info(events)

使用

print()
也不会在控制台中产生任何结果。

google-cloud-dataflow apache-beam google-cloud-pubsub apache-beam-io
2个回答
4
投票

这是因为

events
PCollection
,所以您需要对其应用
PTransform

最简单的方法是将

ParDo
应用于
events
:

events | 'Log results' >> beam.ParDo(LogResults())

定义为:

class LogResults(beam.DoFn):
  """Just log the results"""
  def process(self, element):
    logging.info("Pub/Sub event: %s", element)
    yield element

请注意,如果您想在下游应用进一步的步骤,例如在记录元素后写入接收器,我也会生成该元素。例如,请参阅此处的问题。


0
投票

不使用 Map 的简单方法是通过 beam.LogElements() 内置日志转换。

所以你的代码可以变成这样,

 p | 'Read PubSub' >> ReadFromPubSub(subscription=SUBSCRIPTION) | beam.LogElements()
© www.soinside.com 2019 - 2024. All rights reserved.