GCP Dataflow 批处理

问题描述 投票:0回答:1

我正在为 GCP Dataflow 批处理进行 POC。

我想传递 Pandas Dataframe 作为批量输入并执行列式转换并再次返回同一批次。

我参考下面链接中提供的 MultipyByTwo 示例 https://beam.apache.org/documentation/programming-guide/

当我输入 Pandas Dataframe process_batch 函数时未执行。

您能否告诉我原因,如果可能,请提供示例。

代码- pd = read_excel(路径)

result1 = ( pd | "测试批次" >> beam.ParDo(TestBatch(argv)))

DoFn 类 -

类(TestBatch(beam.DoFn): def init(self, args: 任意): self.args = args 打印(“(TestBatch.init”)

def setup(self) -> None:
    print("(TestBatch.setup")

def process_batch(self, batch: pd.DataFrame) -> pd.DataFrame:
    print("(TestBatch.process_batch")

    print(batch)

    yield batch
google-cloud-dataflow apache-beam
1个回答
0
投票

我建议你看一下Beam Dataframes,例如,https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html#apache_beam.dataframe.io.read_excel。这允许你更有效地处理数据帧。

对于您的情况,此代码示例应该有效:

import apache_beam as beam
import pandas as pd

df = pd.read_csv("beers.csv")


class ProcessDf(beam.DoFn):
    def process(self, e: pd.DataFrame):
        print(e)
        return e


with beam.Pipeline() as p:
    p | beam.Create([df]) | beam.ParDo(ProcessDf())

Beam 管道需要从 Pipeline 开始。

https://beam.apache.org/get-started/ 有更多信息。 Tour of Beam 是学习一些基本概念的好地方。

© www.soinside.com 2019 - 2024. All rights reserved.