根据beam的文档:
函数对象中的瞬态字段不会传输到工作线实例,因为它们不会自动序列化。
我的代码:
class myBeamFunction(beam.DoFn):
def __setstate__(self, state):
self.__dict__ = state
self.my_nonserialisable_object = new Nonserialisable_object()
def process(self, element):
return self.my_nonserialisable_object.do(element)
我的申请失败是因为:
RuntimeError: maximum recursion depth exceeded
从堆栈跟踪中,我看到它是由以下原因引起的:
Python/2.7/lib/python/site-packages/apache_beam/internal/pickler.py
我想知道是否有一种方法可以在工作器实例中初始化我需要的所有非可序列化变量?
谢谢。
这可以通过DoFn.StartBundle方法实现。 @Javadoc。
在DoFn.StartBundle方法中初始化每个DoFn实例中的状态。如果初始化不依赖于主程序只知道或由早期流水线操作计算的任何信息,但是对于所有程序执行的所有DoFn实例都是相同的,例如设置空缓存或初始化常量数据,这是很好的。
它在python中恰好相同。所以稍加修改:
class myBeamFunction(beam.DoFn):
def __init__(self):
self.my_nonserialisable_object = None
def start_bundle(self, context=None):
self.my_nonserialisable_object = new Nonserialisable_object()
def process(self, element):
return self.my_nonserialisable_object.do(element)