Python joblib并行函数的多个返回和打印输出

问题描述 投票:0回答:1

希望从Python中的并行化函数输出多个产量(或返回值)。得到某种酸洗错误。请注意,我还希望在子进程中看到x的打印输出。

%%time
from math import sqrt
from joblib import Parallel, delayed
# pip install joblib
import multiprocessing

num_cores = multiprocessing.cpu_count()

def producer():

  allOutput = []
  allX = []
  for x in range(0, 10):

    x = x*5
    print('Produced %s' % x)
    output = sqrt(x)
    print('Output sqrt %s' % output)
    allOutput.append(output)
    allX.append(x)

  return allOutput, allX

allOutput, allX = Parallel(n_jobs=num_cores)(delayed(i) for i in producer()) 

输出应该是qazxsw poi和qazxsw poi的两个列表。

x*5
python parallel-processing pickle yield joblib
1个回答
1
投票

这得到了结果。但是,打印语句不会以正确的顺序返回。

sqrt(x*5)

输出:

Output sqrt 0.0
Produced 5
Output sqrt 2.23606797749979
Produced 10
Output sqrt 3.1622776601683795
Produced 15
Output sqrt 3.872983346207417
Produced 20
Output sqrt 4.47213595499958
Produced 25
Output sqrt 5.0
Produced 30
Output sqrt 5.477225575051661
Produced 35
Output sqrt 5.916079783099616
Produced 40
Output sqrt 6.324555320336759
Produced 45
Output sqrt 6.708203932499369
---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/joblib/externals/loky/backend/queues.py", line 151, in _feed
    obj, reducers=reducers)
  File "/usr/local/lib/python3.6/dist-packages/joblib/externals/loky/backend/reduction.py", line 145, in dumps
    p.dump(obj)
  File "/usr/local/lib/python3.6/dist-packages/joblib/parallel.py", line 290, in __getstate__
    for func, args, kwargs in self.items]
  File "/usr/local/lib/python3.6/dist-packages/joblib/parallel.py", line 290, in <listcomp>
    for func, args, kwargs in self.items]
TypeError: 'function' object is not iterable
"""

The above exception was the direct cause of the following exception:

PicklingError                             Traceback (most recent call last)
<ipython-input-29-ceae606b0c70> in <module>()
----> 1 get_ipython().run_cell_magic('time', '', "from math import sqrt\nfrom joblib import Parallel, delayed\n\ndef producer():\n  \n  for x in range(0, 10):\n    \n    x = x*5\n    print('Produced %s' % x)\n    output = sqrt(x)\n    print('Output sqrt %s' % output)\n    yield output, x\n\noutSqrt, outX = Parallel(n_jobs=2)(delayed(i) for i in producer()) \n")

/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
   2115             magic_arg_s = self.var_expand(line, stack_depth)
   2116             with self.builtin_trap:
-> 2117                 result = fn(magic_arg_s, cell)
   2118             return result
   2119 

</usr/local/lib/python3.6/dist-packages/decorator.py:decorator-gen-60> in time(self, line, cell, local_ns)

/usr/local/lib/python3.6/dist-packages/IPython/core/magic.py in <lambda>(f, *a, **k)
    186     # but it's overkill for just that one bit of state.
    187     def magic_deco(arg):
--> 188         call = lambda f, *a, **k: f(*a, **k)
    189 
    190         if callable(arg):

/usr/local/lib/python3.6/dist-packages/IPython/core/magics/execution.py in time(self, line, cell, local_ns)
   1191         else:
   1192             st = clock2()
-> 1193             exec(code, glob, local_ns)
   1194             end = clock2()
   1195             out = None

<timed exec> in <module>()

/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in __call__(self, iterable)
    994 
    995             with self._backend.retrieval_context():
--> 996                 self.retrieve()
    997             # Make sure that we get a last message telling us we are done
    998             elapsed_time = time.time() - self._start_time

/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in retrieve(self)
    897             try:
    898                 if getattr(self._backend, 'supports_timeout', False):
--> 899                     self._output.extend(job.get(timeout=self.timeout))
    900                 else:
    901                     self._output.extend(job.get())

/usr/local/lib/python3.6/dist-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    515         AsyncResults.get from multiprocessing."""
    516         try:
--> 517             return future.result(timeout=timeout)
    518         except LokyTimeoutError:
    519             raise TimeoutError()

/usr/lib/python3.6/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433             else:
    434                 raise TimeoutError()

/usr/lib/python3.6/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

PicklingError: Could not pickle the task to send it to the workers.
© www.soinside.com 2019 - 2024. All rights reserved.