数据流流管道中的全局窗口侧输入停止固定窗口阶段

问题描述 投票:0回答:1

我正在使用 Dataflow 流管道,并且遇到了一个问题,即在全局窗口中使用侧面输入会锁定使用 30 秒固定窗口的主分支。首先提供侧面输入的“区域处理”之前的所有阶段都运行顺利。一旦我们使用侧面输入进入舞台,它就会停止或急剧减慢。

预期行为: 我的区域处理阶段应在前一阶段完成后立即运行。

实际行为: 即使没有进行任何耗时的处理,该阶段也会在输出之前等待一段时间。 查看“区域处理”阶段的数据流输入/输出集合图,我们发现通常在提供输入后近 1 分钟处理输出。我也可以在阶段日志中看到这一点。

侧输入按预期每 30 秒获取一次,并且只有 1-2kb。

关于输入数据,我尝试过发送准时数据和延迟数据,两者都有相同的问题。这仍处于早期测试阶段,因此输入量非常低。我一次只通过 Postman 发送几条发布-订阅消息。

管道配置:

  • 运行器:Dataflow Runner v2
  • SDK 版本:Apache Beam Python 3.10 SDK 2.52.0
  • 实例:n1-标准-2
  • Dataflow Prime:已禁用

这是我的管道代码的简化版本。

class AddTimestampToMessage(DoFn):
    def process(self, element):
        timestamp_str = element['timestamp']
        dt = datetime.fromisoformat(timestamp_str.rstrip('Z')).replace(tzinfo=pytz.UTC)
        unix_timestamp = dt.timestamp()
        yield TimestampedValue(element, unix_timestamp)


class FetchSideInputDataFromFirebase(DoFn):
    def __init__(self, databaseUrl, eventId):

        self.databaseUrl = databaseUrl
        self.eventId = eventId

    def start_bundle(self):
        """
        Initialize Firebase Admin SDK
        """
        # Initialization code for Firebase
        if not firebase_admin._apps:
    
            firebase_admin.initialize_app(None, options={
                'databaseURL': self.databaseUrl
            })

    def process(self, element):
        if (not self.eventId):
            logging.error('No event id provided to fetch side input data. Skipping...')
            return
        ref = db.reference('/events/' + self.eventId + '/side_input_data')
        data = ref.get()
        for key, value in data.items():
            yield (key, value)

class ProcessMessage(DoFn):
    def process(self, messageGroup, side_input=None, window=DoFn.WindowParam, timestamp=DoFn.TimestampParam):

        # * Takes the message group and outputs 3 records, using side input data
        record1 = {}
        record2 = {}
        record3 = {}

        for record in [record1, record2, record3]:
            yield record


def run(input_subscription, output_path, window_size=30.0, pipeline_args=None, project=None):
    
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:

        sideInputPColl = (
            pipeline
            | 'Periodic Impulse' >> PeriodicImpulse(fire_interval=window_size, apply_windowing=False)
            | 'Fetch Firebase Data' >> ParDo(FetchSideInputDataFromFirebase(...))
            | "Global Window" >> WindowInto(
                GlobalWindows(),
                trigger=Repeatedly(AfterProcessingTime(1)),
                accumulation_mode=AccumulationMode.DISCARDING,
            )
        )

        (
            pipeline
            | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)
            | 'Add timestamp based on message event time' >> ParDo(AddTimestampToMessage())
            | "Window into Fixed Windows" >> WindowInto(
                  FixedWindows(window_size),
                  accumulation_mode=AccumulationMode.ACCUMULATING,
                  allowed_lateness=432000 # allow 5 days of late data
            )
            | "Key by tag_id" >> WithKeys(lambda message: message['tag_id'])
            | "Group by tag_id" >> GroupByKey()
            # ------ here is where my issue is ------
            | "Zone Processing" >> ParDo(ProcessMessage(), pvalue.AsDict(sideInputPColl))
            | "Write to Bigquery" >> io.WriteToBigQuery(
                    'my_table',
                    create_disposition=io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=io.BigQueryDisposition.WRITE_APPEND
                    )
        )

我尝试过的事情:

  • 调整侧面输入设置
    • AsSingleton
      AsDict
      选项
side_input_update = pvalue.AsSingleton(sideInputPColl)
  • 将窗口应用于周期性脉冲而不是全局窗口
    • 我也看到了类似的问题。
            side_input_update = (
                pipeline
                | 'Periodic Impulse' >> PeriodicImpulse(fire_interval=window_size, apply_windowing=True)
                | 'Fetch Firebase Data' >> ParDo(FetchSideInputDataFromFirebase(...))
            )
  • 尝试了各种全局窗口触发延迟
    • 这没有明显的区别
"Global Window" >> WindowInto(
                GlobalWindows(),
                trigger=Repeatedly(AfterProcessingTime(5)), # 1, 5, 30
                accumulation_mode=AccumulationMode.DISCARDING,
            )
  • 使用 DirectRunner 进行调试
    • 代码在直接运行器中运行正常。但是,我正在使用PeriodicImpulse 的错误解决方法,因此它仅在首次运行时“脉冲”。
google-cloud-dataflow apache-beam
1个回答
0
投票

侧面输入全局窗口存在一个错误https://github.com/apache/beam/issues/28776,这可能与您的问题相关,也可能无关。但仔细一看,和上面提到的问题很相似。

我会尝试提供的解决方法是将

num_workers
管道选项设置为大于 1。

您的数据流管道可扩展到多少工作人员?

© www.soinside.com 2019 - 2024. All rights reserved.