我正在使用有状态且及时的 DoFn 在我正在实现的固定窗口结束后 2 秒处理数据。
我已经在 Apache Beam 游乐场内测试了我的代码的可重现示例。我的数据采用
KV[str, str]
格式作为 DoFn 的输入。我能想到的 Playground 和我的代码之间的唯一区别是 Playground 使用 DirectRunner,而我使用 DataflowRunner。
在我的 DoFn 之前,我有另一个 DoFn,它将输入 PCollection 转变为有状态 DoFn 期望的格式:
class AddKeys(beam.DoFn):
def __init__(self, settings):
self.settings = settings
def process(self, element):
data = element["data"]
for setting in self.settings:
if setting["data"] == data:
yield [
("tuple key 1",
setting["tuple key 1"]),
("tuple key 2", setting["tuple key 2"]),
("tuple key 3", setting["tuple key 3"]),
("element", str(element))
]
然后我的有状态 DoFn 应该获取输出并处理它:
class ProcessCollection(beam.DoFn):
EXPIRY_TIMER = TimerSpec('expiry', TimeDomain.WATERMARK)
BUFFER_STATE = BagStateSpec(
'buffer', ListCoder(StrUtf8Coder()))
def process(self, element,
timer=beam.DoFn.TimerParam(EXPIRY_TIMER),
window=beam.DoFn.WindowParam,
buffer=beam.DoFn.StateParam(BUFFER_STATE)):
timer.set(window.end + Duration(seconds=2))
buffer.add(str(element))
@on_timer(EXPIRY_TIMER)
def expiry(self, buffer=beam.DoFn.StateParam(BUFFER_STATE)):
events = buffer.read()
for event in events:
yield ''.join(event)
buffer.clear()
我从 Google Cloud 收到的错误是:
File "/usr/local/lib/python3.10/site-packages/apache_beam/coders/coders.py", line 429, in encode
return value.encode('utf-8')
AttributeError: 'tuple' object has no attribute 'encode' [while running 'Add Keys-ptransform-51']
任何人都可以确定为什么会发生这种情况吗?
代码中出现错误“AttributeError:‘tuple’对象没有属性‘encode’”。该问题与编码有关。
• 检查输入数据格式并确保它是字符串 • 检查“AddKeys”DoFn 并验证传递的数据类型 • 检查“ProcessCollection”DoFn 并检查输入元素是否为元组