from apache_beam.options.pipeline_options import PipelineOptions
import os
import logging
import apache_beam as beam
import gzip
import json, io
from google.cloud import storage
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "credentials.json"
def read_inputpath_date(input_path):
# Read the contents of the .jsonl.gz file from GCS
storage_client = storage.Client()
bucket_name, file_name = input_path[5:].split("/", 1)
# Get the GCS bucket and file object
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
# Download the .jsonl.gz file contents as bytes
file_content = blob.download_as_bytes()
# Read and parse the .jsonl.gz file
data = []
with gzip.GzipFile(fileobj=io.BytesIO(file_content), mode='r') as f:
for line in f:
record = json.loads(line)
data.append(record)
logging.info('Done reading data from path{}'.format(input_path))
return data
def process_messages(element):
# Process the received Pub/Sub message
message = json.loads(element.decode('utf-8'))
logging.info('Received message: {}'.format(message))
# Add your custom logic
input_path = message["file_path"]
input_path = 'gs://mf-staging-area/' + input_path
table_name = message["table_name"]
logging.info('Input path: {}'.format(input_path))
logging.info('Table name: {}'.format(table_name))
try:
data = read_inputpath_date(input_path)
for line in data:
# Process each line of the file
table_row = line['payload']
logging.info('Line: {}'.format(line))
logging.info('Table: {}'.format(table_name))
logging.info('Row: {}'.format(table_row))
except:
logging.exception('Error occurred while reading input path: {}'.format(input_path))
def run():
# Set the Google Cloud project and Pub/Sub subscription
project_id = 'moneyfellows-data'
subscription = 'projects/moneyfellows-data/subscriptions/streaming-topic-sub'
# Create PipelineOptions with Dataflow runner
pipeline_options = PipelineOptions()
# Define the Dataflow pipeline
with beam.Pipeline(options=pipeline_options) as p:
# Read messages from Pub/Sub subscription
messages = (
p
| 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)
)
# Process the messages
processed_messages = (
messages
| 'ProcessMessages' >> beam.ParDo(process_messages)
)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
run()
我尝试使用 Dataflow runner 从 pub/sub 流式传输消息,它在 Direct runner 上运行良好但在读取谷歌云存储输入路径内容并将其解析为数据变量时在 Dataflow runner 上给出错误“名称'read_inputpath_date'未定义' . 我认为它与调用函数的上下文或范围有关,尽管它是在调用之前定义的 有谁知道该错误背后的原因是什么以及为什么它只出现在 Dataflow runner 而不是 Directrunner 上?
我认为如果你将
read_inputpath_date
移动到一个单独的文件中并在主文件之外,它应该可以工作:
file_reader.py
def read_inputpath_date(input_path):
# Read the contents of the .jsonl.gz file from GCS
storage_client = storage.Client()
bucket_name, file_name = input_path[5:].split("/", 1)
# Get the GCS bucket and file object
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
# Download the .jsonl.gz file contents as bytes
file_content = blob.download_as_bytes()
# Read and parse the .jsonl.gz file
data = []
with gzip.GzipFile(fileobj=io.BytesIO(file_content), mode='r') as f:
for line in f:
record = json.loads(line)
data.append(record)
logging.info('Done reading data from path{}'.format(input_path))
return data
然后从
main.py
文件导入方法:
from apache_beam.options.pipeline_options import PipelineOptions
import os
import logging
import apache_beam as beam
import gzip
import json, io
from google.cloud import storage
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "credentials.json"
def process_messages(element):
from your_root_folder.file_reader import read_inputpath_date
# Process the received Pub/Sub message
message = json.loads(element.decode('utf-8'))
logging.info('Received message: {}'.format(message))
# Add your custom logic
input_path = message["file_path"]
input_path = 'gs://mf-staging-area/' + input_path
table_name = message["table_name"]
logging.info('Input path: {}'.format(input_path))
logging.info('Table name: {}'.format(table_name))
try:
data = read_inputpath_date(input_path)
for line in data:
# Process each line of the file
table_row = line['payload']
logging.info('Line: {}'.format(line))
logging.info('Table: {}'.format(table_name))
logging.info('Row: {}'.format(table_row))
except:
logging.exception('Error occurred while reading input path: {}'.format(input_path))
def run():
# Set the Google Cloud project and Pub/Sub subscription
project_id = 'moneyfellows-data'
subscription = 'projects/moneyfellows-data/subscriptions/streaming-topic-sub'
# Create PipelineOptions with Dataflow runner
pipeline_options = PipelineOptions()
# Define the Dataflow pipeline
with beam.Pipeline(options=pipeline_options) as p:
# Read messages from Pub/Sub subscription
messages = (
p
| 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)
)
# Process the messages
processed_messages = (
messages
| 'ProcessMessages' >> beam.ParDo(process_messages)
)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
run()
在
DoFn
内部完成的一些处理,并且从 main.py
文件中看不到工作人员,一种可能的解决方案是将方法移到 main 之外,并将其导入到将使用它的函数中。