我必须在 Python 中创建一个 Apache 数据束,它必须执行以下功能 -
我在网络上找不到的一件事是如何将任何步骤(Ptransform)的输出传递到下一个步骤。有人可以指出我正确的方向吗?
下面的示例演示了将一个处理步骤(PTransform)的输出传递到下一处理步骤的过程。
import apache_beam as beam
# Define a PTransform function to read data from the database
class ReadFromDatabase(beam.PTransform):
def expand(self, pcoll):
# Simulating reading data from the database
return pcoll | 'ReadFromDatabase' >> beam.Create([{'id': 1, 'name': 'example'}])
# Define a PTransform function to call the first REST API
class CallFirstAPI(beam.PTransform):
def expand(self, pcoll):
# Simulating calling the first API and transforming the data
return pcoll | 'CallFirstAPI' >> beam.Map(lambda x: {'id': x['id'], 'api_data': 'api_response'})
# Define a PTransform function to call the second REST API
class CallSecondAPI(beam.PTransform):
def expand(self, pcoll):
# Simulating calling the second API and transforming the data
return pcoll | 'CallSecondAPI' >> beam.Map(lambda x: {'id': x['id'], 'api_data': x['api_data'], 'additional_data': 'more_info'})
# Define a PTransform function to update data in the database
class UpdateDatabase(beam.PTransform):
def expand(self, pcoll):
# Simulating updating the database with the combined data
return pcoll | 'UpdateDatabase' >> beam.Map(print)
# Build the pipeline
pipeline = beam.Pipeline()
# Use the defined PTransforms above
read_from_db = pipeline | 'ReadFromDatabase' >> ReadFromDatabase()
call_first_api = read_from_db | 'CallFirstAPI' >> CallFirstAPI()
call_second_api = call_first_api | 'CallSecondAPI' >> CallSecondAPI()
update_db = call_second_api | 'UpdateDatabase' >> UpdateDatabase()
# Run the pipeline
result = pipeline.run()
result.wait_until_finish()
补充要点:
侧输入建议: 如果 API 调用的数据不经常更改,请考虑使用侧面输入来优化管道。检查有关侧面输入的文档以了解更多详细信息:https://beam.apache.org/documentation/patterns/side-inputs/。
处理动态外部数据更改: 如果外部 API 数据频繁变化,另一种推荐的方法是有效地对外部服务的调用进行分组。请参阅有关对元素进行分组的文档以实现高效的外部服务调用: https://beam.apache.org/documentation/patterns/grouping-elements-for-efficient-external-service-calls/