我一直在尝试运行下面的这个程序,但没有成功,因为出现了回溯错误,因为该函数已明确定义,所以我不明白该错误。任何建议都非常感激。我在 Windows10、Jupyter Notebook、Python3 上运行。我希望这些信息足以提供一些指导。
"""
Multi processing in Python, threading
"""
import pandas as pd
import timeit, sys
import numpy as np
import multiprocess as mp
df = pd.read_csv("C:/files/data.csv", encoding="UTF-8")
df['start-digits'] = None
startTime = timeit.default_timer()
def strip_digits(x):
return str(x)[:4]
def zip_handler(x):
x['start-digits'] = x['Zip Codes'].apply(strip_digits)
return x
def parallelize(dataframe, func):
dataframe_split = np.array_split(dataframe, partitions)
pool = mp.Pool(cores)
dataframe_return = pd.concat(pool.map(func, dataframe_split), ignore_index=True)
pool.close()
return dataframe_return
if __name__ == '__main__':
#mp.set_start_method('spawn')
cores = mp.cpu_count()
partitions = cores
df = parallelize(df, zip_handler)
group_dataframe = df.groupby(['Zip Codes'])
size_of_group = group_dataframe.size()
print(size_of_group)
print(group_dataframe.get_group(2443))
stopTime = timeit.default_timer() - startTime
round_time = round(stopTime, 6)
print('')
print(round_time/60, 'min handle time')
print('cpu threads', cores)
print('split size', partitions)
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\trevo\anaconda3\lib\site-packages\multiprocess\pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "C:\Users\trevo\anaconda3\lib\site-packages\multiprocess\pool.py", line 48, in mapstar
return list(map(*args))
File "<ipython-input-10-e6bb79e2f39d>", line 20, in zip_handler
NameError: name 'strip_digits' is not defined
"""
The above exception was the direct cause of the following exception:
NameError Traceback (most recent call last)
<ipython-input-10-e6bb79e2f39d> in <module>
35 partitions = cores
36
---> 37 df = parallelize(df, zip_handler)
38
39 group_dataframe = df.groupby(['Zip Codes'])
<ipython-input-10-e6bb79e2f39d> in parallelize(dataframe, func)
24 dataframe_split = np.array_split(dataframe, partitions)
25 pool = mp.Pool(cores)
---> 26 dataframe_return = pd.concat(pool.map(func, dataframe_split), ignore_index=True)
27 pool.close()
28
~\anaconda3\lib\site-packages\multiprocess\pool.py in map(self, func, iterable, chunksize)
362 in a list that is returned.
363 '''
--> 364 return self._map_async(func, iterable, mapstar, chunksize).get()
365
366 def starmap(self, func, iterable, chunksize=None):
~\anaconda3\lib\site-packages\multiprocess\pool.py in get(self, timeout)
769 return self._value
770 else:
--> 771 raise self._value
772
773 def _set(self, i, obj):
NameError: name 'strip_digits' is not defined
import os
import timeit
import multiprocessing as mp
import concurrent.futures
import numpy as np
import pandas as pd
# Define the strip_digits function
def strip_digits(x):
return str(x)[:4]
# Define the zip_handler function
def zip_handler(x):
x['start-digits'] = x['Zip Codes'].apply(strip_digits)
return x
def parallelize(dataframe, func):
dataframe_split = np.array_split(dataframe, partitions)
pool = mp.Pool(cores)
dataframe_return = pd.concat(pool.map(func, dataframe_split),
ignore_index=True)
pool.close()
return dataframe_return
if __name__ == '__main__':
cores = mp.cpu_count()
partitions = cores
# Specify the file path
file_path = "data/Test.csv"
# Check if the file exists
if os.path.exists(file_path):
# Load your data into the df DataFrame
df = pd.read_csv(file_path, sep=";")
# Call the parallelize function with df and zip_handler
df = parallelize(df, zip_handler)
group_dataframe = df.groupby(['Zip Codes'])
size_of_group = group_dataframe.size()
print(size_of_group)
print(group_dataframe.get_group(2443))
stopTime = timeit.default_timer()
round_time = round(stopTime, 6)
print('')
print(round_time/60, 'min handle time')
print('cpu threads', cores)
print('split size', partitions)
else:
print(f"File not found: {file_path}")