在python梁函数中使用非可序列化对象

问题描述 投票:0回答:1

根据beam的文档:

函数对象中的瞬态字段不会传输到工作线实例,因为它们不会自动序列化。

我的代码:

class myBeamFunction(beam.DoFn):

    def __setstate__(self, state):
        self.__dict__ = state
        self.my_nonserialisable_object = new Nonserialisable_object()

    def process(self, element):
        return self.my_nonserialisable_object.do(element)

我的申请失败是因为:

RuntimeError: maximum recursion depth exceeded

从堆栈跟踪中,我看到它是由以下原因引起的:

Python/2.7/lib/python/site-packages/apache_beam/internal/pickler.py

我想知道是否有一种方法可以在工作器实例中初始化我需要的所有非可序列化变量?

谢谢。

python python-2.7 apache-beam
1个回答
0
投票

这可以通过DoFn.StartBundle方法实现。 @Javadoc

在DoFn.StartBundle方法中初始化每个DoFn实例中的状态。如果初始化不依赖于主程序只知道或由早期流水线操作计算的任何信息,但是对于所有程序执行的所有DoFn实例都是相同的,例如设置空缓存或初始化常量数据,这是很好的。

它在python中恰好相同。所以稍加修改:

class myBeamFunction(beam.DoFn):

    def __init__(self):
        self.my_nonserialisable_object = None

    def start_bundle(self, context=None):
         self.my_nonserialisable_object = new Nonserialisable_object()

    def process(self, element):
        return self.my_nonserialisable_object.do(element)
© www.soinside.com 2019 - 2024. All rights reserved.