的Python:我怎么可以并行运行python的功能呢?

问题描述 投票:73回答:5

我研究了第一和找不到一个回答我的问题。我想在Python并行运行多个功能。

我有这样的事情:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

我想打电话给FUNC1和FUNC2,让他们在同一时间运行。该功能不会相互或对同一对象进行交互。现在我必须等待FUNC1完成之前FUNC2启动。我该怎么做类似下面:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

我希望能够创建两个目录非常接近,因为每一分钟我就指望如何创建多个文件同时进行。如果目录不存在它会甩开我的时间。

python
5个回答
123
投票

你可以使用threadingmultiprocessing

由于peculiarities of CPythonthreading是不太可能实现真正的并行。出于这个原因,multiprocessing通常是一个更好的选择。

下面是一个完整的例子:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

启动/加入子进程的机制可以很容易地被封装到一起你runBothFunc的线功能:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

6
投票

这可以用优雅来Ray做,一个系统,让您轻松并行化和分发Python代码。

并行的例子,你需要与@ray.remote装饰定义的功能,然后用.remote调用它们。

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

如果您传递相同的参数函数和参数是大的,更有效的方式来做到这一点是使用ray.put()。这样就避免了大的参数两次序列,并建立它的两个内存拷贝:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

如果func1()func2()返回结果,你需要按如下重写代码:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

有一些使用雷在multiprocessing模块的优势。特别地,相同的代码将计算机集群上的单台机器上,以及运行。对于雷的更多的优势看this related post


3
投票

有没有办法保证两个功能将同步执行与对方似乎是你想要做什么。

你能做的最好是将功能分成几个步骤,然后等待双方使用Process.join像@ AIX的回答中提到的关键点同步完成。

这比time.sleep(10)更好,因为你不能保证精确计时。有了明确的等待,你说的功能必须做移动到下一个前执行该步骤,而不是假设它会在10ms内这是无法保证基于还有什么打算在机器上完成。


3
投票

如果你是一个Windows用户和使用python 3,那么这篇文章将帮助你做并行编程python.when你运行一个平常多库游泳池编程,你会得到关于你的程序的主要功能的错误。这是因为,事实上,窗户没有fork()的功能。以下职位是给解决了上述问题。

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

因为我用的是Python 3中,我改变了计划有点像这样:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

该功能后,上述问题的代码也改变有点像这样:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

而我得到的输出:

[1, 8, 27, 64, 125, 216]

我想这个职位可能是一些Windows用户是有用的。


2
投票

如果您的功能主要是做I / O工作(和更少的CPU工作),你有Python的3.2以上版本,你可以使用一个ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

如果您的功能主要是做CPU的工作(和更少的I / O工作),你有Python的2.6+,你可以使用multiprocessing模块:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])
© www.soinside.com 2019 - 2024. All rights reserved.