此管道代码作为 Direct runner 运行良好,但在 Dataflow runner 上运行时出错 'name 'read_inputpath_date' is not defined'

问题描述 投票:0回答:1
from apache_beam.options.pipeline_options import PipelineOptions
import os
import logging
import apache_beam as beam
import gzip
import json, io

from google.cloud import storage

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "credentials.json"


def read_inputpath_date(input_path):
    # Read the contents of the .jsonl.gz file from GCS
    storage_client = storage.Client()
    bucket_name, file_name = input_path[5:].split("/", 1)
    # Get the GCS bucket and file object
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    # Download the .jsonl.gz file contents as bytes
    file_content = blob.download_as_bytes()
    # Read and parse the .jsonl.gz file
    data = []
    with gzip.GzipFile(fileobj=io.BytesIO(file_content), mode='r') as f:
        for line in f:
            record = json.loads(line)
            data.append(record)
    logging.info('Done reading data from path{}'.format(input_path))
    return data


def process_messages(element):
    # Process the received Pub/Sub message
    message = json.loads(element.decode('utf-8'))
    logging.info('Received message: {}'.format(message))
    # Add your custom logic
    input_path = message["file_path"]
    input_path = 'gs://mf-staging-area/' + input_path
    table_name = message["table_name"]
    logging.info('Input path: {}'.format(input_path))
    logging.info('Table name: {}'.format(table_name))
    try:
        data = read_inputpath_date(input_path)
        for line in data:
            # Process each line of the file
            table_row = line['payload']
            logging.info('Line: {}'.format(line))
            logging.info('Table: {}'.format(table_name))
            logging.info('Row: {}'.format(table_row))
    except:
        logging.exception('Error occurred while reading input path: {}'.format(input_path))


def run():
    # Set the Google Cloud project and Pub/Sub subscription
    project_id = 'moneyfellows-data'
    subscription = 'projects/moneyfellows-data/subscriptions/streaming-topic-sub'

    # Create PipelineOptions with Dataflow runner
    pipeline_options = PipelineOptions()

    # Define the Dataflow pipeline
    with beam.Pipeline(options=pipeline_options) as p:
        # Read messages from Pub/Sub subscription
        messages = (
                p
                | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)
        )

        # Process the messages
        processed_messages = (
                messages
                | 'ProcessMessages' >> beam.ParDo(process_messages)
        )


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    run()

我尝试使用 Dataflow runner 从 pub/sub 流式传输消息,它在 Direct runner 上运行良好但在读取谷歌云存储输入路径内容并将其解析为数据变量时在 Dataflow runner 上给出错误“名称'read_inputpath_date'未定义' . 我认为它与调用函数的上下文或范围有关,尽管它是在调用之前定义的 有谁知道该错误背后的原因是什么以及为什么它只出现在 Dataflow runner 而不是 Directrunner 上?

google-cloud-platform google-cloud-storage google-cloud-dataflow apache-beam google-cloud-pubsub
1个回答
0
投票

我认为如果你将

read_inputpath_date
移动到一个单独的文件中并在主文件之外,它应该可以工作:

file_reader.py

def read_inputpath_date(input_path):
    # Read the contents of the .jsonl.gz file from GCS
    storage_client = storage.Client()
    bucket_name, file_name = input_path[5:].split("/", 1)
    # Get the GCS bucket and file object
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    # Download the .jsonl.gz file contents as bytes
    file_content = blob.download_as_bytes()
    # Read and parse the .jsonl.gz file
    data = []
    with gzip.GzipFile(fileobj=io.BytesIO(file_content), mode='r') as f:
        for line in f:
            record = json.loads(line)
            data.append(record)
    logging.info('Done reading data from path{}'.format(input_path))
    return data

然后从

main.py
文件导入方法:

from apache_beam.options.pipeline_options import PipelineOptions
import os
import logging
import apache_beam as beam
import gzip
import json, io

from google.cloud import storage

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "credentials.json"

def process_messages(element):
    from your_root_folder.file_reader import read_inputpath_date

    # Process the received Pub/Sub message
    message = json.loads(element.decode('utf-8'))
    logging.info('Received message: {}'.format(message))
    # Add your custom logic
    input_path = message["file_path"]
    input_path = 'gs://mf-staging-area/' + input_path
    table_name = message["table_name"]
    logging.info('Input path: {}'.format(input_path))
    logging.info('Table name: {}'.format(table_name))
    try:
        data = read_inputpath_date(input_path)
        for line in data:
            # Process each line of the file
            table_row = line['payload']
            logging.info('Line: {}'.format(line))
            logging.info('Table: {}'.format(table_name))
            logging.info('Row: {}'.format(table_row))
    except:
        logging.exception('Error occurred while reading input path: {}'.format(input_path))


def run():
    # Set the Google Cloud project and Pub/Sub subscription
    project_id = 'moneyfellows-data'
    subscription = 'projects/moneyfellows-data/subscriptions/streaming-topic-sub'

    # Create PipelineOptions with Dataflow runner
    pipeline_options = PipelineOptions()

    # Define the Dataflow pipeline
    with beam.Pipeline(options=pipeline_options) as p:
        # Read messages from Pub/Sub subscription
        messages = (
                p
                | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)
        )

        # Process the messages
        processed_messages = (
                messages
                | 'ProcessMessages' >> beam.ParDo(process_messages)
        )


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    run()

DoFn
内部完成的一些处理,并且从
main.py
文件中看不到工作人员,一种可能的解决方案是将方法移到 main 之外,并将其导入到将使用它的函数中。

© www.soinside.com 2019 - 2024. All rights reserved.