通过 GCP 数据流和 GCP 功能处理文件

问题描述 投票:0回答:1

我目前正在使用以下代码来处理放置在我的输入存储桶中的 csv 文件。我使用数据流处理它们,但我在数据流作业中遇到错误。

我已在下面列出了代码和错误。

主.py

import functions_framework
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

def process_csv_file(element):
\# Cleans up CSV fields by removing leading/trailing whitespaces and filtering out 'NaN'
cleaned_data = \[field.strip() for field in element.split(',') if field.strip() != 'NaN'\]
return ','.join(cleaned_data)

def generate_valid_job_name(file_name):
\# Generates a valid job name by replacing non-alphanumeric characters with underscores
cleaned_name = re.sub(r'\[^a-z0-9\]', '_', file_name.lower())
if not cleaned_name\[0\].isalpha():
cleaned_name = 'a' + cleaned_name\[1:\]
if not cleaned_name\[-1\].isalnum():
cleaned_name = cleaned_name\[:-1\] + 'z'
if not cleaned_name:
cleaned_name = 'default_job_name'
cleaned_name = cleaned_name.replace('_', '')
return cleaned_name

@functions_framework.cloud_event
def start_dataflow_process(cloud_event):
\# Imports necessary modules
import base64
import json

    # Define pipeline options and parameters
    project_id = 'your_project_id'
    input_bucket = 'your_input_bucket'
    output_bucket = 'your_output_bucket'
    output_prefix = 'Out_'
    
    # Extracts relevant information from the Cloud Event data
    data = cloud_event.data
    file_name = data['name']
    bucket_name = data['bucket']
    
    # Checks if the keyword 'input' is present in the file name
    if 'input' not in file_name.lower():
        print(f"File {file_name} does not contain the keyword 'input'. Exiting.")
        return
    
    print(f"File: {file_name}, Bucket: {bucket_name}")
    
    # Generates a valid job name based on the input file name
    job_name = generate_valid_job_name(file_name)
    
    # Configures Dataflow pipeline options
    pipeline_options = {
        'project': project_id,
        'runner': 'DataflowRunner',
        'staging_location': f'gs://{output_bucket}/{job_name}/staging',
        'temp_location': f'gs://{output_bucket}/{job_name}/temp',
        'job_name': job_name,
        'region': 'your_region',
        'save_main_session': True
    }
    
    # Creates a Dataflow pipeline
    pipeline = beam.Pipeline(options=PipelineOptions.from_dictionary(pipeline_options))
    
    # Reads CSV data from the input file
    csv_data = pipeline | 'ReadFromText' >> beam.io.ReadFromText(f'gs://{input_bucket}/{file_name}')
    
    # Processes CSV data by cleaning up fields
    cleaned_data = csv_data | 'ProcessCSV' >> beam.Map(process_csv_file)
    
    # Defines the output path for the cleaned data
    output_path = f'gs://{bucket_name}/{output_prefix}{file_name}'
    
    # Writes the cleaned data to a text file
    cleaned_data | 'WriteToText' >> beam.io.WriteToText(output_path)
    
    print("Pipeline starting...")
    
    # Runs the Dataflow pipeline
    result = pipeline.run()
    
    print("Pipeline started")
    
    # Waits for the pipeline to finish
    result.wait_until_finish()
    
    print("Pipeline ended")

错误:

文件“/usr/local/lib/python3.8/site-packages/dill/_dill.py”,第 827 行,在 _import_module 中 返回 getattr(import(模块, None, None, [obj]), obj) ModuleNotFoundError:没有名为“functions_framework”的模块

python google-cloud-functions google-cloud-dataflow apache-beam
1个回答
0
投票

看来你需要在本地安装函数框架。

pip 安装功能-框架

© www.soinside.com 2019 - 2024. All rights reserved.