数据流并行管道

问题描述 投票:0回答:0

我需要实现一个具有并行管道的作业数据流(一个用于文件configuration.json中找到的每个实体)。
第一步是从 pub/sub 读取一个事件,该事件通知文件到达存储桶 GCS 上的实体。读取并解析消息后,从 GCS 中取出文件,并将内容推送到实体主题。
这是我的实现:

import apache_beam as beam
import json
import csv
import apache_beam.io.fileio as fileio
from apache_beam.options.pipeline_options import PipelineOptions


def read_csv_file(readable_file):
 with beam.io.filesystems.FileSystems.open(readable_file) as gcs_file:
   for row in csv.reader(gcs_file):
     yield row

def run():
   options = PipelineOptions(
       project='XXX',
       runner='DataflowRunner',
       streaming=True,
       job_name='dataflow-job',
       staging_location='gs://dataflow-bucket-at/staging',
       temp_location='gs://dataflow-bucket-at/temp',
       region='europe-west8'
   )

   with open('./conf/configuration.json', 'r') as f:
       data = json.load(f)
       pipeline = beam.Pipeline(options=options)
       
       for entity in data['entities'][0].keys():

         ( pipeline
            | "Read from pubsub_{}".format(entity) >> beam.io.ReadFromPubSub(subscription=f"projects/XXX/subscriptions/topic_events_{entity}_sub")
            | "Parse messages_{}".format(entity) >> beam.Map(lambda message: json.loads(message))
            | "Extract file_path_{}".format(entity) >> beam.Map(lambda message: message.get('filePath'))  # Extract the value of 'file_path' from the dictionary
            | "Filter message_{}".format(entity) >> beam.Filter(lambda e: e.split('.')[0] == entity)
            | "Find CSV_{}".format(entity) >>  fileio.MatchFiles(f"gs://input_files_at/{entity}.csv") # emits FileMetadatas
            | "Read CSV_{}".format(entity) >> beam.FlatMap(read_csv_file)                          # emits rows
            | "Parse CSV_{}".format(entity) >> beam.Map(lambda line: line.split(","))
            | "Publish message_{}".format(entity) >> beam.io.WriteToPubSub(f"projects/XXX/topics/topic_{entity}")
         )
       
       pipeline.run().wait_until_finish()


if __name__ == "__main__":
   run()

在configuration.json中,我有两个实体,因此我期望作业数据流中有两个并行分支。

结果是这样的: Dataflow screenshot

你能帮我吗?

pipeline google-cloud-dataflow apache-beam google-cloud-pubsub apache-beam-io
© www.soinside.com 2019 - 2024. All rights reserved.