我有一段 apache 束管代码,它从 GCS 存储桶中的文件读取并打印它。它与 DirectRunner 完美配合并打印文件输出,但与 Dataflow 运行器一起它不会打印任何内容,也没有错误。
我们需要为数据流运行器做一些特殊/不同的事情吗?
代码看起来像这样
p = beam.Pipeline(options=pipeline_options)
read_file_pipe = (
p
| "Create {}".format(file_name) >> beam.Create(["Start"])
| "Read File {}".format(file_name)
>> ReadFromTextWithFilename(file_path, skip_header_lines=1)
| beam.Map(print)
)
p.run().wait_until_finish()
调用堆栈为 python3 Test_Pipe.py --region us-central1 --output_project= --runner=DataflowRunner --project= --temp_location= --service_account_email= --experiments=use_network_tags=default-uscentral1 --subnetwork --no_use_public_ips
您可以使用
logging
而不是 print
来解决您的问题,我用 logging
添加了您的代码片段:
import logging
p = beam.Pipeline(options=pipeline_options)
read_file_pipe = (
p
| "Create {}".format(file_name) >> beam.Create(["Start"])
| "Read File {}".format(file_name)
>> ReadFromTextWithFilename(file_path, skip_header_lines=1)
| beam.Map(self.log_element)
)
p.run().wait_until_finish()
def log_element(self, element):
logging.info(element)
return element
解决我的问题的是改变读取这些文件的方法。我基本上从
ReadFromTextWithFilename
切换到MatchFiles
(apache_beam.io.fileio
)。
出于某种原因,我仍然不知道
ReadFromTextWithFilename
没有达到预期的行为。文档也不清楚。
我的代码现在看起来像这样:
from apache_beam.io import fileio
import apache_beam as beam
def __expand_items(element):
content = element.read_utf8()
if content:
items = content.splitlines()
filename = element.metadata.path
for item in items:
yield (filename, item)
...
| "Match" >> fileio.MatchFiles(self.gcs_pattern, empty_match_treatment=fileio.EmptyMatchTreatment.ALLOW)
| "Read" >> fileio.ReadMatches()
| "Flatmap" >> beam.FlatMap(__expand_items)
| "Reshuffle -- avoid fusion" >> beam.Reshuffle()
我的参考是这个答案。