Apache Beam:窗口关闭后等待 N 秒以执行 DoFn

问题描述 投票:0回答:1

我有实时数据被发布并以同步方式读入数据流管道。

我收集数据,对其进行窗口化(固定为 1 秒)并进行累积,然后将其写入 Firestore DB。 该数据库正在由前端监视,当新数据到达时,前端会自动提取快照。

我看到的行为是前端数据到达不同步,数据到达之间的延迟不一致。

我一直在研究有状态和及时的处理,我认为它可以解决我的问题,但我不知道如何实现它,因为我没有批量处理任何数据,据我所知,这意味着我应该使用及时的处理

DoFn

class WriteToFirestore(beam.DoFn):
    EXPIRY_TIMER = userstate.TimerSpec('expiry', userstate.TimeDomain.WATERMARK)

    def setup(self):
        from google.cloud import firestore
        self.firestore_client = firestore.Client(project='<project id>')

    def process(self, element,
        w=beam.DoFn.WindowParam,
        expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)):

        expiry_timer.set(w.end + 0.5) # wait 0.5 seconds before executing

    @userstate.on_timer(EXPIRY_TIMER)
    def expiry(self, element):
        collection_ref = self.firestore_client.collection('<collection id>')
        collection_ref.add(element)

这是当前代码,它会抛出编码器数量不匹配的问题。

我做错了什么?

python google-cloud-platform google-cloud-firestore google-cloud-dataflow apache-beam
1个回答
0
投票

对数据进行窗口化并将其分组后,您应该使用窗口时间作为“时间戳”。将

beam.DoFn.TimerParam
作为参数传递也是个好主意。构造函数采用计时器名称标签和设置的持续时间(您可以使用
datetime.timedelta
来设置)。此代码未经测试,但应该可以工作:

class WriteToFirebase(beam.DoFn):
    def setup(self):
        from google.cloud import firestore
        self.firestore_client = firestore.Client(project='<project id>')
        
    def process(self, element, timer=beam.DoFn.TimerParam, window=beam.DoFn.WindowParam):
        # Get the end timestamp of the current window.
        window_end = window.end

        # Calculate the time 1 second after the window end.
        event_time = window_end + timedelta(seconds=1)

        # Set a timer to trigger at the calculated event time.
        timer.set("my_timer", event_time)

    def on_timer(self, element, timer=beam.DoFn.TimerParam):
        collection_ref = self.firestore_client.collection('fused_detections')
        collection_ref.add(element)```
© www.soinside.com 2019 - 2024. All rights reserved.