我需要实现一个具有并行管道的作业数据流(一个用于文件configuration.json中找到的每个实体)。
第一步是从 pub/sub 读取一个事件,该事件通知文件到达存储桶 GCS 上的实体。读取并解析消息后,从 GCS 中取出文件,并将内容推送到实体主题。
这是我的实现:
import apache_beam as beam
import json
import csv
import apache_beam.io.fileio as fileio
from apache_beam.options.pipeline_options import PipelineOptions
def read_csv_file(readable_file):
with beam.io.filesystems.FileSystems.open(readable_file) as gcs_file:
for row in csv.reader(gcs_file):
yield row
def run():
options = PipelineOptions(
project='XXX',
runner='DataflowRunner',
streaming=True,
job_name='dataflow-job',
staging_location='gs://dataflow-bucket-at/staging',
temp_location='gs://dataflow-bucket-at/temp',
region='europe-west8'
)
with open('./conf/configuration.json', 'r') as f:
data = json.load(f)
pipeline = beam.Pipeline(options=options)
for entity in data['entities'][0].keys():
( pipeline
| "Read from pubsub_{}".format(entity) >> beam.io.ReadFromPubSub(subscription=f"projects/XXX/subscriptions/topic_events_{entity}_sub")
| "Parse messages_{}".format(entity) >> beam.Map(lambda message: json.loads(message))
| "Extract file_path_{}".format(entity) >> beam.Map(lambda message: message.get('filePath')) # Extract the value of 'file_path' from the dictionary
| "Filter message_{}".format(entity) >> beam.Filter(lambda e: e.split('.')[0] == entity)
| "Find CSV_{}".format(entity) >> fileio.MatchFiles(f"gs://input_files_at/{entity}.csv") # emits FileMetadatas
| "Read CSV_{}".format(entity) >> beam.FlatMap(read_csv_file) # emits rows
| "Parse CSV_{}".format(entity) >> beam.Map(lambda line: line.split(","))
| "Publish message_{}".format(entity) >> beam.io.WriteToPubSub(f"projects/XXX/topics/topic_{entity}")
)
pipeline.run().wait_until_finish()
if __name__ == "__main__":
run()
在configuration.json中,我有两个实体,因此我期望作业数据流中有两个并行分支。
你能帮我吗?