Python数据流DoFN类函数finish_bundle多次运行并提供空输出

问题描述 投票:0回答:1

我正在运行一个数据流管道,在该管道中,我必须将数据整理到一个Python数据帧中以用于下一步。因此,我使用DoFn类并定义__init__processfinish_bundle函数,如下所示。我希望得到一个将所有条目整理到一个数据帧中的输出。我将此输出作为单例输入输入到管道的下一步中。

class collate_ga_data(beam.DoFn):
    def __init__(self):
        self._ga_data = pd.DataFrame()
        self.window = beam.window.GlobalWindow()
        logging.info("In INITIALIZATION :   {0}".format(self.window))

    def process(self, element,window=beam.DoFn.WindowParam):
        self.window = window
        logging.info("In PROCESS :   {0}".format(self.window))
        self._ga_data=self._ga_data.append(pd.DataFrame({k: [v] for k, v in element.items()}))

    def finish_bundle(self):
        logging.info(" The shape of ga_dataset imported  :  {0}".format(self._ga_data.shape))
        logging.info("In FINISH BUNDLE :   {0}".format(self.window))
        yield WindowedValue(self._ga_data,0,windows=[self.window])

此代码使用Directrunner可以完美地工作并给出预期的结果,但是使用Dataflow运行器会引发错误:

File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 178, in execute
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 612, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/common.py", line 824, in apache_beam.runners.common.DoFnRunner.finish
  File "apache_beam/runners/common.py", line 808, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 806, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 398, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 401, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 959, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 610, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/sideinputs.py", line 65, in __getitem__
    _FilteringIterable(self._iterable, target_window), self._view_options)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/pvalue.py", line 443, in _from_runtime_iterable
    len(head), str(head[0]), str(head[1])))
ValueError: PCollection of size 2 with more than one element accessed as a singleton view. First two elements encountered are "Empty DataFrame

我进行了一些挖掘,发现DoFn提供了3个输出-1是必需的数据帧,其他两个是空数据帧。 finish_bundle给出3个结果。我不知道背后的原因。我不想为此使用任何窗口,但是根据文档,finish_bundle的输出必须是窗口值,因此我在其中有一个全局窗口。

以上代码的日志记录信息如下:

2020-02-27T16:32:45.331291913Z  The shape of ga_dataset imported  :  (0, 0) I 
2020-02-27T16:32:45.331489801Z In FINISH BUNDLE :   GlobalWindow I 
2020-02-27T16:32:45.390583276Z  The shape of ga_dataset imported  :  (0, 0) I 
2020-02-27T16:32:45.390754222Z In FINISH BUNDLE :   GlobalWindow I 
2020-02-27T16:32:48.639126300Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.641757011Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.644909381Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.647359848Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.649686336Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.651899814Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.654145240Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.656555175Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.658823966Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.660887002Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.663397789Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.665476560Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.667604684Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.669671535Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.672025680Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.674037218Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.676348209Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.678587436Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.680708885Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.682787656Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.685523986Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.687734365Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.689816713Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.691826343Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.693920373Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.696102380Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.698341846Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.700649023Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.703155755Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.705482244Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.707590818Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.709594726Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.711608886Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.713906288Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.716273546Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.718636035Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.720866918Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.723044872Z  The shape of ga_dataset imported  :  (37, 8) I 
2020-02-27T16:32:48.723157405Z In FINISH BUNDLE :   GlobalWindow I 

数据流仅为此管道使用一个工作程序。有人知道为什么会这样吗?

python google-cloud-dataflow apache-beam dataflow
1个回答
0
投票

[根据Beam execution model,“将集合分为多个包是任意的,并由跑步者选择。”这就是为什么finish_bundle可以被多次调用的原因。

看来,使用DataFrame作为累加器,而使用带有CombineGlobaln的CombineFn可以更好地解决您的问题。请查看Beam programming guide(4.2.4。合并)以获取有关如何实现它的说明。

© www.soinside.com 2019 - 2024. All rights reserved.