AttributeError:“tuple”对象在有状态 DoFn 期间没有属性“encode”

问题描述 投票:0回答:1

我正在使用有状态且及时的 DoFn 在我正在实现的固定窗口结束后 2 秒处理数据。

我已经在 Apache Beam 游乐场内测试了我的代码的可重现示例。我的数据采用

KV[str, str]
格式作为 DoFn 的输入。我能想到的 Playground 和我的代码之间的唯一区别是 Playground 使用 DirectRunner,而我使用 DataflowRunner。

在我的 DoFn 之前,我有另一个 DoFn,它将输入 PCollection 转变为有状态 DoFn 期望的格式:

class AddKeys(beam.DoFn):
    def __init__(self, settings):
        self.settings = settings

    def process(self, element):
        data = element["data"]

        for setting in self.settings:
            if setting["data"] == data:
                yield [
                    ("tuple key 1",
                     setting["tuple key 1"]),
                    ("tuple key 2", setting["tuple key 2"]),
                    ("tuple key 3", setting["tuple key 3"]),
                    ("element", str(element))
                ]

然后我的有状态 DoFn 应该获取输出并处理它:

class ProcessCollection(beam.DoFn):
    EXPIRY_TIMER = TimerSpec('expiry', TimeDomain.WATERMARK)
    BUFFER_STATE = BagStateSpec(
        'buffer', ListCoder(StrUtf8Coder()))

    def process(self, element,
                timer=beam.DoFn.TimerParam(EXPIRY_TIMER),
                window=beam.DoFn.WindowParam,
                buffer=beam.DoFn.StateParam(BUFFER_STATE)):

        timer.set(window.end + Duration(seconds=2))

        buffer.add(str(element))

    @on_timer(EXPIRY_TIMER)
    def expiry(self, buffer=beam.DoFn.StateParam(BUFFER_STATE)):
        events = buffer.read()

        for event in events:
            yield ''.join(event)

        buffer.clear()

我从 Google Cloud 收到的错误是:

File "/usr/local/lib/python3.10/site-packages/apache_beam/coders/coders.py", line 429, in encode
    return value.encode('utf-8')
AttributeError: 'tuple' object has no attribute 'encode' [while running 'Add Keys-ptransform-51']

pastebin 中的完整堆栈跟踪

任何人都可以确定为什么会发生这种情况吗?

python google-cloud-platform encoding google-cloud-dataflow apache-beam
1个回答
-1
投票

代码中出现错误“AttributeError:‘tuple’对象没有属性‘encode’”。该问题与编码有关。

• 检查输入数据格式并确保它是字符串 • 检查“AddKeys”DoFn 并验证传递的数据类型 • 检查“ProcessCollection”DoFn 并检查输入元素是否为元组

© www.soinside.com 2019 - 2024. All rights reserved.