Apache Beam 优化 Firestore 读取 python

问题描述 投票:0回答:1

我有传感器数据到达 pub/sub (protobuf),它作为 python 字典插入到“pipeline_fstore”中。数据一次到达。 在管道中的“添加元数据...”步骤中,我正在读取 Firestore 数据以增强接收到的数据。 如何优化 Firestore 上的“读取”?有什么建议吗? 我稍后在“添加元数据”上添加的字段上“GroupByKey”,所以我无法提前完成。

问:当我们使用“start_bundle(self):”时,它仅适用于之前对数据进行分组的情况,对吗?在这种情况下,“捆绑”将是单独的行,对吧?

谢谢

fs_results = (
            pipeline_fstore
            | 'Add Timestamp' >> beam.Map(lambda elem: beam.window.TimestampedValue(elem, int(elem['measurement_time'])))
            | 'Window into window_size Seconds' >> beam.WindowInto(beam.window.FixedWindows(window_size), allowed_lateness=beam.window.Duration(seconds=60))
            | 'Add Metadata to the original data' >> beam.ParDo(add_metadata())
            | 'Add Node Position' >> beam.ParDo(add_position())
            | 'Extract siteId as Key' >> beam.Map(lambda elem: (elem.get("siteId", ""), elem))
            | 'Group by siteId' >> beam.GroupByKey()
            | 'Write to Firestore' >> beam.ParDo(FirestoreUpdateDoFn())
        )
python google-cloud-firestore google-cloud-dataflow apache-beam
1个回答
0
投票

我能够使用“GroupByKey”并应用“start_bundle”和“finish_bundle”来优化对 firestore 的写入,以启动捆绑批次并写入最终批次。 现在,在 15 秒的窗口中,我看到批量大小约为 10-20 行,这与数据接收率一致。

关于我关于“start_bundle”的问题,根据我的测试,我只有在应用“GroupByKey”时才能有效地让窗口工作。仅具有“窗口”机制不足以看到当前的行为。

很高兴收到任何可以改进我的代码的反馈,并很高兴帮助其他人实现同样的目标。

class FirestoreUpdateDoFn(beam.DoFn):

    def setup(self):
        from firebase_admin import firestore
        self.db = firestore.Client()

    def teardown(self):
        self.db.close()

    def start_bundle(self):
        date = datetime.now()
        logging.info(f"Starting bundle - Firestore ({date})")
        self.batch = self.db.batch()
        self.entries = 0

    def finish_bundle(self):
        date = datetime.now()
        logging.info(f"Finish bundle ({date}). batch size: {self.entries}\n")
        try:
            self.batch.commit()
        except Exception as e:
            logging.error(e)
© www.soinside.com 2019 - 2024. All rights reserved.