出于开发目的,我想缓存由
beam.io.ReadFromBigQuery
连接器在 BigQuery 中进行的查询结果 - 这样我就能够在下次运行完全相同的查询时从本地文件系统快速加载它们。
问题是我无法在
beam.io.ReadFromBigQuery
之前运行任何 PTransform 来验证缓存的存在并因此跳过从 BigQuery 的读取。
目前我想出了两种可能的解决方案:
beam.DoFn
以从 BigQuery 读取。它将包括缓存机制,但与现有连接器相比可能表现不佳。一种变化可能是继承现有的连接器 - 但它需要“幕后”的 Beam 知识 - 这可能会让人不知所措。apache_beam.io.textio.ReadAllFromText
或beam.io.ReadFromBigQuery
)。我发现任何其他 PTransform 之前的
beam.io.ReadFromBigQuery
在设计上都是不可能的。不幸的是,它目前在 Python SDK 文档中没有得到很好的体现 - 但根据 Apache Beam 的官方 Java 文档:“根 PTransform 概念上没有输入” - 并且等效的 java PTransform BigQueryIO.Read() 继承自 PBegin
,它在其前面加上了其他东西的密封。
但是,我找到了一种类似于我在问题中建议的第二种方法的解决方法 - 实现一个
beam.PTransform
(不是 beam.DoFn
),在构建管道时根据缓存是否存在动态返回适当的 PTransform。看起来像这样:
class ReadFromBigQueryWithCache(beam.PTransform):
def __init__(self, query):
super().__init__()
self.query = query
def expand(self, input_or_inputs: InputT) -> OutputT:
## Implement here logic for caching
## if cached:
## return input_or_inputs | beam.io.ReadFromAvro(file_pattern="path/to/cached_results")
## else:
return input_or_inputs | beam.io.ReadFromBigQuery(
query=self.query
)