如何在下一步中访问先前 Ptransform 的输出

问题描述 投票:0回答:1

我必须在 Python 中创建一个 Apache 数据束,它必须执行以下功能 -

  1. 从数据库表中读取符合特定条件的条目
  2. 为此记录调用第一个 REST API
  3. 在上述API调用的响应中,将得到一个数组。为该数组中的每个元素调用两个 API
  4. 更新数据库中所有 3 个 API 调用获取的数据

我在网络上找不到的一件事是如何将任何步骤(Ptransform)的输出传递到下一个步骤。有人可以指出我正确的方向吗?

python apache-spark google-cloud-dataflow apache-beam
1个回答
0
投票

下面的示例演示了将一个处理步骤(PTransform)的输出传递到下一处理步骤的过程。

import apache_beam as beam

# Define a PTransform function to read data from the database
class ReadFromDatabase(beam.PTransform):
    def expand(self, pcoll):
        # Simulating reading data from the database
        return pcoll | 'ReadFromDatabase' >> beam.Create([{'id': 1, 'name': 'example'}])

# Define a PTransform function to call the first REST API
class CallFirstAPI(beam.PTransform):
    def expand(self, pcoll):
        # Simulating calling the first API and transforming the data
        return pcoll | 'CallFirstAPI' >> beam.Map(lambda x: {'id': x['id'], 'api_data': 'api_response'})

# Define a PTransform function to call the second REST API
class CallSecondAPI(beam.PTransform):
    def expand(self, pcoll):
        # Simulating calling the second API and transforming the data
        return pcoll | 'CallSecondAPI' >> beam.Map(lambda x: {'id': x['id'], 'api_data': x['api_data'], 'additional_data': 'more_info'})

# Define a PTransform function to update data in the database
class UpdateDatabase(beam.PTransform):
    def expand(self, pcoll):
        # Simulating updating the database with the combined data
        return pcoll | 'UpdateDatabase' >> beam.Map(print)

# Build the pipeline
pipeline = beam.Pipeline()

# Use the defined PTransforms above
read_from_db = pipeline | 'ReadFromDatabase' >> ReadFromDatabase()
call_first_api = read_from_db | 'CallFirstAPI' >> CallFirstAPI()
call_second_api = call_first_api | 'CallSecondAPI' >> CallSecondAPI()
update_db = call_second_api | 'UpdateDatabase' >> UpdateDatabase()

# Run the pipeline
result = pipeline.run()
result.wait_until_finish()

补充要点:

  1. Apache Beam 编程指南的考虑:
  • Apache Beam 编程指南可能缺乏足够的 Python 示例。您可以探索更多特定于 Python 的示例来增强您的理解。
  1. 侧输入建议: 如果 API 调用的数据不经常更改,请考虑使用侧面输入来优化管道。检查有关侧面输入的文档以了解更多详细信息:https://beam.apache.org/documentation/patterns/side-inputs/

  2. 处理动态外部数据更改: 如果外部 API 数据频繁变化,另一种推荐的方法是有效地对外部服务的调用进行分组。请参阅有关对元素进行分组的文档以实现高效的外部服务调用: https://beam.apache.org/documentation/patterns/grouping-elements-for-efficient-external-service-calls/

© www.soinside.com 2019 - 2024. All rights reserved.