yield从自定义DoFn得到`finish_bundle`

问题描述 投票:0回答:1

我的管道的一个步骤涉及从外部数据源获取,我想以块的形式执行此操作(顺序无关紧要)。我找不到任何类似的类,所以我创建了以下内容:

class FixedSizeBatchSplitter(beam.DoFn):
  def __init__(self, size):
    self.size = size

  def start_bundle(self):
    self.current_batch = []

  def finish_bundle(self):
    if self.current_batch: 
      yield self.current_batch

  def process(self, element):
    self.current_batch.append(element)
    if len(self.current_batch) >= self.size:
      yield self.current_batch
      self.current_batch = []

但是,当我运行此管道时,我得到一个RuntimeError: Finish Bundle should only output WindowedValue type错误:

with beam.Pipeline() as p:
  res = (p
         | beam.Create(range(10))
         | beam.ParDo(FixedSizeBatchSplitter(3))
        )

这是为什么?为什么我可以在process中产出输出而不是在finish_bundle中输出?顺便说一句,如果我删除finish_bundle管道工作,但显然丢弃剩菜。

apache-beam
1个回答
5
投票

DoFn可能正在处理来自多个不同窗口的元素。当你在process()时,“当前窗口”是明确的 - 它是正在处理的元素的窗口。当你在finish_bundle时,它是不明确的,你需要明确指定窗口。你需要产生一些形式yield WindowedValue(something, timestamp, [window])

如果您的所有数据都在全局窗口中,那将更容易:window将只是GlobalWindow()。如果您使用多个窗口,则每个窗口需要1个缓冲区;捕获process()中的窗口,以便添加到适当的缓冲区;并且在finish_bundle中在相应的窗口中发出它们中的每一个。

© www.soinside.com 2019 - 2024. All rights reserved.