相同的 Apache Beam 代码适用于 Direct Runner,但不适用于 Dataflow 运行器

问题描述 投票:0回答:2

我有一段 apache 束管代码,它从 GCS 存储桶中的文件读取并打印它。它与 DirectRunner 完美配合并打印文件输出,但与 Dataflow 运行器一起它不会打印任何内容,也没有错误。

我们需要为数据流运行器做一些特殊/不同的事情吗?

代码看起来像这样

  p = beam.Pipeline(options=pipeline_options)
  read_file_pipe = (
      p
      | "Create {}".format(file_name) >> beam.Create(["Start"])
      | "Read File {}".format(file_name)
      >> ReadFromTextWithFilename(file_path, skip_header_lines=1)
      | beam.Map(print)
  )

  p.run().wait_until_finish()

调用堆栈为 python3 Test_Pipe.py --region us-central1 --output_project= --runner=DataflowRunner --project= --temp_location= --service_account_email= --experiments=use_network_tags=default-uscentral1 --subnetwork --no_use_public_ips

python google-cloud-dataflow apache-beam apache-beam-io
2个回答
2
投票

您可以使用

logging
而不是
print
来解决您的问题,我用
logging
添加了您的代码片段:

import logging

p = beam.Pipeline(options=pipeline_options)
  read_file_pipe = (
      p
      | "Create {}".format(file_name) >> beam.Create(["Start"])
      | "Read File {}".format(file_name)
      >> ReadFromTextWithFilename(file_path, skip_header_lines=1)
      | beam.Map(self.log_element)
  )

  p.run().wait_until_finish()

 def log_element(self, element):
     logging.info(element)

     return element

0
投票

解决我的问题的是改变读取这些文件的方法。我基本上从

ReadFromTextWithFilename 
切换到
MatchFiles
apache_beam.io.fileio
)。

出于某种原因,我仍然不知道

ReadFromTextWithFilename
没有达到预期的行为。文档也不清楚。

我的代码现在看起来像这样:

from apache_beam.io import fileio
import apache_beam as beam

def __expand_items(element):
    content = element.read_utf8()
    if content:
        items = content.splitlines()
        filename = element.metadata.path
        for item in items:
            yield (filename, item)

...
| "Match" >> fileio.MatchFiles(self.gcs_pattern, empty_match_treatment=fileio.EmptyMatchTreatment.ALLOW)
| "Read" >> fileio.ReadMatches()
| "Flatmap" >> beam.FlatMap(__expand_items)
| "Reshuffle -- avoid fusion" >> beam.Reshuffle()

我的参考是这个答案

© www.soinside.com 2019 - 2024. All rights reserved.