多处理不会创建任何额外的进程

问题描述 投票:0回答:2

我试图使用多处理提高我的程序在Python中的速度,但它实际上并没有创建任何更多的进程。我看了几个教程,但我没有到达任何地方。

这里是:

    cpuutil = int((multiprocessing.cpu_count()) / 2)
    p = Pool(processes = cpuutil)
    output = p.map(OSGBtoETRSfunc(data, eastcol, northcol))
    p.close()
    p.join()
    return output

所以对我来说,这应该在四核机器上创建2个进程,但事实并非如此。我的CPU工具大约占18%......

任何见解?它看起来和我看过的教程相同......当在方括号([])中列出参数时,p.map不起作用,所以我认为它需要在上面的语法中?

谢谢

python multiprocessing python-multiprocessing
2个回答
1
投票

我不清楚你想要什么,所以让我们从简单开始吧。以下是一种简单地在pd数据帧的行上调用相同函数的方法:

import pandas as pd
import numpy as np
import os

import pathos
from contextlib import closing

NUM_PROCESSES = os.cpu_count()
# create some data frame 100x4

nrow = 100
ncol = 4
df = pd.DataFrame(np.random.randint(0,100,size=(nrow, ncol)), columns=list('ABCD'))

# dataframe resides in global scope
# so it is accessible to processes spawned below
# I pass only row indices to each process

# function to be run over rows
# it transforms the given row independently
def foo(idx):
    # extract given row to numpy
    row = df.iloc[[idx]].values[0]
    # you can pass ranges:
    # df[2:3]

    # transform row
    # I return it as list for simplicity of creating dataframe
    row = np.exp(row)

    # return numpy row
    return row


# run pool over range of indexes (0,1, ... , nrow-1)
# and close it afterwars
# there is not reason here to have more workers than number of CPUs
with closing(pathos.multiprocessing.Pool(processes=NUM_PROCESSES)) as pool:    
    results = pool.map(foo, range(nrow))

# create new dataframe from all those numpy slices:
col_names = df.columns.values.tolist()
df_new = pd.DataFrame(np.array(results), columns=col_names)

您的计算需要更复杂的设置?

编辑:好的,这里同时运行两个函数(我对pandas不太熟悉,所以只需切换到numpy):

# RUNNING TWO FUNCTIONS SIMLTANEOUSLY

import pandas as pd
import numpy as np

from multiprocessing import Process, Queue

# create some data frame 100x4

nrow = 100
ncol = 4
df = pd.DataFrame(np.random.randint(0,100,size=(nrow, ncol)), columns=list('ABCD'))

# dataframe resides in global scope
# so it is accessible to processes spawned below
# I pass only row indices to each process

# function to be run over part1 independently
def proc_func1(q1):

    # get data from queue1
    data1 = q1.get()

    # I extract given data to numpy
    data_numpy = data1.values

    # do something
    data_numpy_new = data_numpy + 1

    # return numpy array to queue 1
    q1.put(data_numpy_new)

    return 


# function to be run over part2 independently
def proc_func2(q2):

    # get data from queue2
    data2 = q2.get()


    # I extract given data to numpy
    data_numpy = data2.values

    # do something
    data_numpy_new = data_numpy - 1


    # return numpy array to queue 2
    q2.put(data_numpy_new)

    return


# instantiate queues
q1 = Queue()
q2 = Queue()

# divide data frame into two parts

part1 = df[:50]
part2 = df[50:]

# send data, so it will already be in queries
q1.put(part1)
q2.put(part2)

# start two processes 
p1 = Process(target=proc_func1, args=(q1,))
p2 = Process(target=proc_func2, args=(q2,))

p1.start()
p2.start()

# wait until they finish
p1.join()
p2.join()


# read results from Queues

res1 = q1.get()
res2 = q2.get()

if (res1 is None) or (res2 is None):
    print('Error!')


# reassemble two results back to single dataframe (might be inefficient)
col_names = df.columns.values.tolist()
# concatenate results along x axis
df_new = pd.DataFrame(np.concatenate([np.array(res1), np.array(res2)], axis=0), columns=col_names)

1
投票

在Python中,您应该提供函数和分隔的参数。如果没有,则在创建进程时执行OSGBtoETRSfunc函数。相反,您应该提供指向函数的指针,以及带有参数的列表。

您的案例类似于Python Docs:https://docs.python.org/3.7/library/multiprocessing.html#introduction上显示的案例

无论如何,我认为你使用的是错误的功能。 Pool.map()在项目列表中用作map:并将相同的函数应用于每个项目。我认为你的函数OSGBtoERTSfunc需要三个参数才能正常工作。请使用p.apply(),而不是使用p.map()

cpuutil = int((multiprocessing.cpu_count()) / 2)
p = Pool(processes = cpuutil)
output = p.apply(OSGBtoETRSfunc, [data, eastcol, northcol])
p.close()
p.join()
return output
© www.soinside.com 2019 - 2024. All rights reserved.