我目前正在使用以下代码来处理放置在我的输入存储桶中的 csv 文件。我使用数据流处理它们,但我在数据流作业中遇到错误。
我已在下面列出了代码和错误。
import functions_framework
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def process_csv_file(element):
\# Cleans up CSV fields by removing leading/trailing whitespaces and filtering out 'NaN'
cleaned_data = \[field.strip() for field in element.split(',') if field.strip() != 'NaN'\]
return ','.join(cleaned_data)
def generate_valid_job_name(file_name):
\# Generates a valid job name by replacing non-alphanumeric characters with underscores
cleaned_name = re.sub(r'\[^a-z0-9\]', '_', file_name.lower())
if not cleaned_name\[0\].isalpha():
cleaned_name = 'a' + cleaned_name\[1:\]
if not cleaned_name\[-1\].isalnum():
cleaned_name = cleaned_name\[:-1\] + 'z'
if not cleaned_name:
cleaned_name = 'default_job_name'
cleaned_name = cleaned_name.replace('_', '')
return cleaned_name
@functions_framework.cloud_event
def start_dataflow_process(cloud_event):
\# Imports necessary modules
import base64
import json
# Define pipeline options and parameters
project_id = 'your_project_id'
input_bucket = 'your_input_bucket'
output_bucket = 'your_output_bucket'
output_prefix = 'Out_'
# Extracts relevant information from the Cloud Event data
data = cloud_event.data
file_name = data['name']
bucket_name = data['bucket']
# Checks if the keyword 'input' is present in the file name
if 'input' not in file_name.lower():
print(f"File {file_name} does not contain the keyword 'input'. Exiting.")
return
print(f"File: {file_name}, Bucket: {bucket_name}")
# Generates a valid job name based on the input file name
job_name = generate_valid_job_name(file_name)
# Configures Dataflow pipeline options
pipeline_options = {
'project': project_id,
'runner': 'DataflowRunner',
'staging_location': f'gs://{output_bucket}/{job_name}/staging',
'temp_location': f'gs://{output_bucket}/{job_name}/temp',
'job_name': job_name,
'region': 'your_region',
'save_main_session': True
}
# Creates a Dataflow pipeline
pipeline = beam.Pipeline(options=PipelineOptions.from_dictionary(pipeline_options))
# Reads CSV data from the input file
csv_data = pipeline | 'ReadFromText' >> beam.io.ReadFromText(f'gs://{input_bucket}/{file_name}')
# Processes CSV data by cleaning up fields
cleaned_data = csv_data | 'ProcessCSV' >> beam.Map(process_csv_file)
# Defines the output path for the cleaned data
output_path = f'gs://{bucket_name}/{output_prefix}{file_name}'
# Writes the cleaned data to a text file
cleaned_data | 'WriteToText' >> beam.io.WriteToText(output_path)
print("Pipeline starting...")
# Runs the Dataflow pipeline
result = pipeline.run()
print("Pipeline started")
# Waits for the pipeline to finish
result.wait_until_finish()
print("Pipeline ended")
错误:
文件“/usr/local/lib/python3.8/site-packages/dill/_dill.py”,第 827 行,在 _import_module 中 返回 getattr(import(模块, None, None, [obj]), obj) ModuleNotFoundError:没有名为“functions_framework”的模块
看来你需要在本地安装函数框架。
pip 安装功能-框架