运行时错误:NotImplementedError [运行'BatchElements(messages)']同时使用to_dataframe运行流式apache beam管道

问题描述 投票:0回答:1

您好,我仍处于 Apache Beam 的学习阶段,下面是我编写的一个脚本。它从 pub-sub 读取并写入 BigQuery。但是我收到错误 RuntimeError: NotImplementedError [while running 'BatchElements(messages)']将 pcollection 转换为数据框以进行进一步的数据转换。我缺少什么?

谢谢

我的代码

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
import os
import typing

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "my_json_file.json"


# Do function for passing in pubsub message from the subscription
# Define a NamedTuple schema
class BmsSchema(typing.NamedTuple):
    ident: str


class ParsePubSubMessage(beam.DoFn):
    def process(self, message):
        import json
        # Creating the main_dict that has all the columns
        all_columns = ['ident']
        main_dict = dict(zip(all_columns, [None] * len(all_columns)))
        # Parse the JSON message
        record = json.loads(message.decode('utf-8'))
        main_dict.update(record)

        yield {
            'ident': main_dict["ident"]
        }


def run():
    # Define pipeline options
    options = PipelineOptions(
        project='dwingestion',
        runner='DirectRunner',
        streaming=True,  # Enable streaming mode
        temp_location='gs://........./temp',
        staging_location='gs://....../staging',
        region='europe-west1',
        job_name='flesp-streaming-pipeline-dataflow-test'
    )

    # Set streaming mode
    options.view_as(StandardOptions).streaming = True

    # Pub/Sub subscription
    input_subscription = 'projects/...../subscriptions/flespi_data_streaming'

    table_schema = {
        "fields": [
            {"name": "ident", "type": "STRING", "mode": "NULLABLE"}
        ]
    }

    # Create the pipeline
    with beam.Pipeline(options=options) as p:
        # Read from Pub/Sub and parse the messages
        messages = (p
                    | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                    | 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
                    | 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
                    )

        # Convert the messages to df
        df = to_dataframe(messages)
        transformed_pcol = to_pcollection(df)
        # Write to BigQuery with schema autodetect
        transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(
            table='.........flesp_table_test_4',
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            schema=table_schema,
            custom_gcs_temp_location='gs://........../temp'
        )


if __name__ == '__main__':
    run()
python google-bigquery google-cloud-dataflow apache-beam
1个回答
0
投票

我通过将运行程序从 DirectRunner 更改为 DataflowRunner 解决了上述问题,当流设置为 True 时,用于本地测试的 directrunner 似乎不完全支持所有 to_dataframe 功能(上面是来自 pubsub 主题的流数据)

© www.soinside.com 2019 - 2024. All rights reserved.