在DataFlow上运行光束管道时关闭后文件被覆盖

问题描述 投票:0回答:1

我创建了一个梁管道p来运行数据流,并希望在运行我的管道之前写一些文件。我的代码是:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import time

pipeline_options = PipelineOptions(runner='DirectRunner')
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)

myString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."

myFile3984573498534 = open('myfile2398457erity348t67349856734986739846.txt','w+')
myFile3984573498534.write(myString*100)
myFile3984573498534.close()

time.sleep(1)

r = p.run()

正确写入文件,但一旦调用p.run(),它就会被覆盖为空白。任何人都可以解释为什么会这样吗?

笔记:

  • 更改文件名和文件变量名称不会影响结果。
  • 我插入了time.sleep(1),以便在调用p.run()之前可以查看文件,并且文件被覆盖为空白。这不是必需的,可以更改/删除。
python pickle apache-beam dataflow dill
1个回答
0
投票

这个问题来自于pipeline_options.view_as(SetupOptions).save_main_session = True线。

当管道运行时,beam将使用dill.dump_session序列化主会话并将其保存到文件中。然后它将使用dill.load_session加载相同的文件并反序列化它以重新创建主会话。它将使用dill.dump_session再次对主要会话进行重新编译以发送给跑步者。序列化,反序列化,然后重新序列化主会话的原因是为了解决序列化中的不一致问题,如https://github.com/uqfoundation/dill/issues/195中提出的那样。这意味着所有参赛者都会遇到这个问题。

在这种情况下,主会话包含myFile3984573498534文件对象。当反序列化时,它将使用w+模式以与最初打开文件相同的方式重新打开文件。这将立即覆盖该文件。然后关闭此文件,管道以文件空白结束。

对此最好的解决方法是在r+模式下打开文件,以便在主会话反序列化期间以读取模式打开文件,从而导致文件未被修改。

如果你需要在w+模式下打开文件,你应该在关闭文件后删除存储文件的变量,即在del(myFile3984573498534)之后但在运行管道之前的myFile3984573498534.close() 。这可以防止变量序列化,因为它不再存在,从而导致文件未被修改。

© www.soinside.com 2019 - 2024. All rights reserved.