错误或功能?无法在python脚本中执行两个连续的多处理步骤

问题描述 投票:2回答:2

我有两个连续的函数来处理大型列表。

我使用joblibParallel, delayed一个接一个地调用,试图单独提高两个函数的处理速度。

然而,当function_1Parallel时,我看到function_2的输出,我不明白为什么。简而言之,这导致function_2没有被召唤。

主要代码:

from mycode import function_2
from joblib import Parallel, delayed
import gc

if __name__ == '__main__':  
   list = list_1
   print ">>> First call"
   Parallel(n_jobs = -1)(delayed(function_1) 
                                         (item) for item in list)
   gc.collect()
   do_other_stuff()
   list = list_2
   print ">>> Second call"
   Parallel(n_jobs=-1, backend='threading')(delayed(function_2)
                                         (item) for item in list)

螺纹功能:

def function_1(): # Gets called first
    print "this comes from function 1"
    pass

def function_2(): # Gets called second
    print "this comes from function 2"
    pass

输出:

>>> First call
this comes from function 1
this comes from function 1
this comes from function 1
this comes from function 1
>>> Second call
this comes from function 1
this comes from function 1
this comes from function 1
this comes from function 1

我的假设是function_1的某些部分存储在一个内存中,在调用它之后保留(可能是由于joblib内存映射/共享功能?)。

这就是为什么我在gc.collect()之间的电话。由于这没有帮助,我想到调用之间的reloading modulesjoblib, Parallel, delayed),这看起来很难看。

有没有人经历过类似的行为(在Windows上)?

有一些修复吗?

我是否需要在joblib步骤之间取消/重新加载mycodeParallel模块,如果是,为什么?

python parallel-processing python-multiprocessing python-multithreading joblib
2个回答
1
投票

我遇到过同样的问题。

我的代码看起来像:

A = Parallel(n_jobs=1)(delayed(self.function_1)( df_1, item ) for item in list_of_items)

B = Parallel(n_jobs=1)(delayed(self.function_2)( df_2, item ) for item in list_of_items)

其中“list_of_items”变量有2个项目。

但输出是......

[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:   32.2s finished
[Parallel(n_jobs=1)]: Done   0 out of   0 | elapsed:    0.0s finished

第二个并行进程没有运行的原因(至少在我的情况下)是因为我的“list_of_items”是生成器而不是列表!

我希望这也能解决你的问题.. :)


2
投票

精简版:

Q1:有没有人遇到类似的行为(在Windows上)? A1:没有。

Q2:有什么问题吗? A2:不,参考。 A1。

问题3:我需要在这里取消/重新加载joblibmycode模块......? A3:不,参考。 A1。

Q4:如果是这样(Q3),为什么? A4:N / A,参考。 A3。


让我们根据常见的MCVE配方来推动我们的努力:

稍微修改的实验结构可能如下所示:

#ass;                         import function_2
from sklearn.externals.joblib import Parallel, delayed
#ass;                         import gc
pass;                         import itertools

def function_1( aParam = "" ):                                          # Gets called first
    try:
         print "this comes from a call: function_1( aParam == {0:})".format( aParam )

    except:
         pass # die in silence

    finally:
         return aParam

def function_2( aParam = "" ):                                          # Gets called second
    try:
         print "this comes from a call: function_2( aParam == {0:})".format( aParam )

    except:
         pass # die in silence

    finally:
         return aParam

if __name__ == '__main__':
   print "-------------------------------------------------------------- vvv main.START()"
   #ist = list_1
   aList = [ 11, 12, 13, 14, 15, ]
   print "-------------------------------------------------------------- >>> First call"
   A = Parallel(                                               n_jobs = -1
                 )( delayed( function_1 ) ( item ) for item in aList
                    )
   print "-------------------------------------------------------------- vvv main was ret'd: {0:}".format( repr( A ) )
   #c.collect()
   #o_other_stuff()
   #ist = list_2
   aList = [ 21, 22, 23, 24, 25, ]
   print "-------------------------------------------------------------- >>> Second call"
   B = Parallel(                                               n_jobs  = -1,
                                                               backend = 'threading'
                 )( delayed( function_2 ) ( item ) for item in aList
                    )
   print "-------------------------------------------------------------- vvv main was ret'd: {0:}".format( repr( B ) )

结果:

C:\Python27.anaconda>python TEST_SO_Parallel.py
-------------------------------------------------------------- vvv main.START()
-------------------------------------------------------------- >>> First call
this comes from a call: function_1( aParam == 11)
this comes from a call: function_1( aParam == 12)
this comes from a call: function_1( aParam == 13)
this comes from a call: function_1( aParam == 14)
this comes from a call: function_1( aParam == 15)
-------------------------------------------------------------- vvv main was ret'd: [11, 12, 13, 14, 15]
-------------------------------------------------------------- >>> Second call
this comes from a call: function_2( aParam == 21)
this comes from a call: function_2( aParam == 22)
this comes from a call: function_2( aParam == 23)
 this comes from a call: function_2( aParam == 25)this comes from a call: function_2( aParam == 24)

-------------------------------------------------------------- vvv main was ret'd: [21, 22, 23, 24, 25]

评定:

如所观察到的,[win] py2.7处理了代码而没有任何上述报告的障碍。

根据规范,joblib记录的处理是正确的。

上述报告的行为没有被复制,可以较少地映射到因果链上的任何形式的joblib参与。

© www.soinside.com 2019 - 2024. All rights reserved.