要么我的进程一个接一个地启动,要么启动(同时),但不调用指针函数。我尝试了许多变体,但它却不像许多教程所讲的那样起作用。我的目标是使字符串模糊匹配的字符串匹配一个80k项的文本列表,放弃不必要的90%+匹配,同时使字符串具有最多的信息(scorer = fuzz.token_set_ratio)。谢谢!
IDE是Anaconda Spyder 4.0,IPython 7.10.1,Python 3.7.5
# -*- coding: utf-8 -*-
import pandas as pd
import multiprocessing
import time
from datetime import datetime
from fuzzywuzzy import fuzz
from fuzzywuzzy import process
#########
preparedDF = []
df1 = []
df2 = []
df3 = []
df4 = []
df5 = []
df6 = []
df7 = []
df8 = []
#########
xdf1 = []
xdf2 = []
xdf3 = []
xdf4 = []
xdf5 = []
xdf6 = []
xdf7 = []
xdf8 = []
#########
def fuzzyPrepare():
#load data do some easy cleaning
global preparedDF
df = pd.read_csv("newEN.csv")
df = df["description"].fillna("#####").tolist()
df = list(dict.fromkeys(df))
try:
df = df.remove("#####")
except ValueError:
pass
preparedDF=df
def fuzzySplit(df=preparedDF):
#split data to feed processes
global df1, df2, df3, df4, df5, df6, df7, df8
df1 = df[:100]
df2 = df[100:200]
df3 = df[200:300]
df4 = df[300:400]
df5 = df[400:500]
df6 = df[500:600]
df7 = df[600:700]
df8 = df[700:800]
def fuzzyMatch(x):
#process.dedupe returns dict_keys object so pass it to a list()
global xdf1, xdf2, xdf3, xdf4, xdf5, xdf6, xdf7, xdf8
if x == 1:
xdf1=list(process.dedupe(df1,threshold=90,scorer=fuzz.token_set_ratio))
elif x == 2:
xdf2=list(process.dedupe(df2,threshold=90,scorer=fuzz.token_set_ratio))
elif x == 3:
xdf3=list(process.dedupe(df3,threshold=90,scorer=fuzz.token_set_ratio))
elif x == 4:
xdf4=list(process.dedupe(df4,threshold=90,scorer=fuzz.token_set_ratio))
elif x == 5:
xdf5=list(process.dedupe(df5,threshold=90,scorer=fuzz.token_set_ratio))
elif x == 6:
xdf6=list(process.dedupe(df6,threshold=90,scorer=fuzz.token_set_ratio))
elif x == 7:
xdf7=list(process.dedupe(df7,threshold=90,scorer=fuzz.token_set_ratio))
elif x == 8:
xdf8=list(process.dedupe(df8,threshold=90,scorer=fuzz.token_set_ratio))
else:
return "error in fuzzyCases!"
#if __name__ == '__main__':
fuzzyPrepare()
fuzzySplit(preparedDF)
#UNHEEDED MULTIPROCESSING, ONLY THIS LINE TRIGGERS THE ACTUAL FUNCTION -> p1 = multiprocessing.Process(name="p1",target=fuzzyMatch(1), args=(1,))
p1 = multiprocessing.Process(name="p1",target=fuzzyMatch, args=(1,))
p2 = multiprocessing.Process(name="p2",target=fuzzyMatch, args=(2,))
p3 = multiprocessing.Process(name="p3",target=fuzzyMatch, args=(3,))
p4 = multiprocessing.Process(name="p4",target=fuzzyMatch, args=(4,))
p5 = multiprocessing.Process(name="p5",target=fuzzyMatch, args=(5,))
p6 = multiprocessing.Process(name="p6",target=fuzzyMatch, args=(6,))
p7 = multiprocessing.Process(name="p7",target=fuzzyMatch, args=(7,))
p8 = multiprocessing.Process(name="p8",target=fuzzyMatch, args=(8,))
jobs = []
jobs.append(p1)
jobs.append(p2)
jobs.append(p3)
jobs.append(p4)
jobs.append(p5)
jobs.append(p6)
jobs.append(p7)
jobs.append(p8)
for j in jobs:
print("process "+ j.name +" started at "+ datetime.now().strftime('%H:%M:%S'))
j.start()
time.sleep(0.3)
for j in jobs:
j.join()
print ("processing complete at "+datetime.now().strftime('%H:%M:%S'))
好,您正在处理一个不重要的问题。我已经自由了干燥(Don't Repeat Yourself)您的代码。我也没有安装您的数据或熊猫,所以我简化了输入和输出。然而,原理都是相同的,几乎没有变化您应该能够使您的代码正常工作!
我有一个800个int元素的数组,每个进程都要计算总数为100。寻找# DRY:
评论
# -*- coding: utf-8 -*-
import multiprocessing
import time
from datetime import datetime
#########
number_of_proc = 8
preparedDF = []
# DRY: This is now a list of lists. This allows us to refer to df1 as dfs[1]
dfs = []
# DRY: A dict of results. The key will be int (the process number!)
xdf = {}
#########
def fuzzyPrepare():
global preparedDF
# Generate fake data
preparedDF = range(number_of_proc * 100)
def fuzzySplit(df):
#split data to feed processes
global dfs
# DRY: Loop and generate N lists for N processes
for i in range(number_of_proc):
from_element = i * 100
to_element = from_element + 100
print("Packing [{}, {})".format(from_element, to_element))
dfs.append(df[from_element:to_element])
def fuzzyMatch(x):
global xdf
# DRY: Since we now have a dict, all the if-else is not needed any more...
xdf[x] = sum(dfs[x])
print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x]))
if __name__ == '__main__':
fuzzyPrepare()
fuzzySplit(preparedDF)
# DRY: Create N processes AND append them
jobs = []
for p in range(number_of_proc):
p = multiprocessing.Process(name="p{}".format(p),target=fuzzyMatch, args=(p,))
jobs.append(p)
for j in jobs:
print("process "+ j.name +" started at "+ datetime.now().strftime('%H:%M:%S'))
j.start()
time.sleep(0.3)
for j in jobs:
j.join()
print ("processing complete at "+datetime.now().strftime('%H:%M:%S'))
print("results:")
for x in range(number_of_proc):
print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x]))
输出:
Packing [0, 100)
Packing [100, 200)
Packing [200, 300)
Packing [300, 400)
Packing [400, 500)
Packing [500, 600)
Packing [600, 700)
Packing [700, 800)
process p0 started at 19:12:00
In process: x=0, xdf[0]=4950
process p1 started at 19:12:00
In process: x=1, xdf[1]=14950
process p2 started at 19:12:00
In process: x=2, xdf[2]=24950
process p3 started at 19:12:01
In process: x=3, xdf[3]=34950
process p4 started at 19:12:01
In process: x=4, xdf[4]=44950
process p5 started at 19:12:01
In process: x=5, xdf[5]=54950
process p6 started at 19:12:01
In process: x=6, xdf[6]=64950
process p7 started at 19:12:02
In process: x=7, xdf[7]=74950
processing complete at 19:12:02
results:
Traceback (most recent call last):
File "./tmp/proctest.py", line 58, in <module>
print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x]))
KeyError: 0
发生了什么事?我在处理函数中打印了这些值,它们在那里?!
[嗯,我不是专家,但是python进程的工作原理类似于fork()
。基本原理是它将产生并初始化一个新的子进程。的子进程将具有父级内存的COPY(!)。这意味着父进程和子进程不共享任何数据/内存!!
因此,在我们的情况下:
dfs
和xdf
变量尽管对于dfs
,我们并不在乎(因为它们用于输入),每个进程现在有它自己的xdf
,而不是父进程的!您知道为什么会出现KeyError吗?
现在很明显,我们需要将数据从流程返回到父级。有很多方法可以做到这一点,但是最简单的方法(使用代码)是使用multiprocessing.Manager
在子进程之间共享数据(查找# NEW:
代码中的标记-注意,我只更改了2行!):
# -*- coding: utf-8 -*- import multiprocessing import time from datetime import datetime # NEW: This can manage data between processes from multiprocessing import Manager ######### number_of_proc = 8 preparedDF = [] dfs = [] # NEW: we create a manager object to store the results manager = Manager() xdf = manager.dict() ######### def fuzzyPrepare(): global preparedDF # Generate fake data preparedDF = range(number_of_proc * 100) def fuzzySplit(df): #split data to feed processes global dfs # DRY: Loop and generate N lists for N processes for i in range(number_of_proc): from_element = i * 100 to_element = from_element + 100 print("Packing [{}, {})".format(from_element, to_element)) dfs.append(df[from_element:to_element]) def fuzzyMatch(x): global xdf # DRY: Since we no have a dict, all the if-else is not needed any more... xdf[x] = sum(dfs[x]) print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x])) if __name__ == '__main__': fuzzyPrepare() fuzzySplit(preparedDF) # DRY: Create N processes AND append them jobs = [] for p in range(number_of_proc): p = multiprocessing.Process(name="p{}".format(p),target=fuzzyMatch, args=(p,)) jobs.append(p) for j in jobs: print("process "+ j.name +" started at "+ datetime.now().strftime('%H:%M:%S')) j.start() time.sleep(0.3) for j in jobs: j.join() print ("processing complete at "+datetime.now().strftime('%H:%M:%S')) print("results:") for x in range(number_of_proc): print("Out of process: x={}, xdf[{}]={}".format(x, x, xdf[x]))
和输出:
Packing [0, 100) Packing [100, 200) Packing [200, 300) Packing [300, 400) Packing [400, 500) Packing [500, 600) Packing [600, 700) Packing [700, 800) process p0 started at 19:34:50 In process: x=0, xdf[0]=4950 process p1 started at 19:34:50 In process: x=1, xdf[1]=14950 process p2 started at 19:34:50 In process: x=2, xdf[2]=24950 process p3 started at 19:34:51 In process: x=3, xdf[3]=34950 process p4 started at 19:34:51 In process: x=4, xdf[4]=44950 process p5 started at 19:34:51 In process: x=5, xdf[5]=54950 process p6 started at 19:34:52 In process: x=6, xdf[6]=64950 process p7 started at 19:34:52 In process: x=7, xdf[7]=74950 processing complete at 19:34:52 results: Out of process: x=0, xdf[0]=4950 Out of process: x=1, xdf[1]=14950 Out of process: x=2, xdf[2]=24950 Out of process: x=3, xdf[3]=34950 Out of process: x=4, xdf[4]=44950 Out of process: x=5, xdf[5]=54950 Out of process: x=6, xdf[6]=64950 Out of process: x=7, xdf[7]=74950
阅读有关此here的更多信息,请注意有关Manager比
multiprocessing.Array
慢的警告(实际上这也可以在此处解决您的问题)