代码工作得很好,但是当使用多个线程时,我希望它运行得更快,例如,如果使用 2 个线程,它应该运行得快一倍,但完成时间却是相同的。
瓶颈位于“TRY 10”和“TRY 20”之间。这就是即使使用更多线程,它也需要相同的速度。
代码如下
final_dfs_time = []
# Initialize a lock for synchronizing access to df_list_time
lock = threading.Lock()
json_list_lock = threading.Lock()
def append_to_dataframe(file_path):
df_temp_time = None
if os.path.getsize(file_path) > 0:
try:
print(f"try 1 {file_path}")
# Read the JSON file using buffered I/O
with open(file_path, 'rb') as file:
buffered_file = io.BufferedReader(file)
json_list = json.load(buffered_file)
print(f"try 2 {file_path}")
# Initialize an empty list to store temporary DataFrames
df_list_time = []
print(f"try 10")
for entry in json_list: # Iterate over json_list, not json_data
try:
with json_list_lock:
for epoch_time, values in entry.items():
# Create a Series for each key-value pair
series = pd.Series(values, name=epoch_time)
# Convert the series into a DataFrame
temp_df_time = series.to_frame().T.rename(columns=column_mapping)
# Add 'time' column
# Convert epoch_time to a pandas Series with the same length as temp_df_time
epoch_time_series = pd.Series(epoch_time, index=temp_df_time.index)
# Assign the Series to the 'time' column directly
temp_df_time['time'] = epoch_time_series
# Acquire the lock before appending to df_list_time
with lock:
# print(f"LOCK {file_path}")
# Append the DataFrame to the list
df_list_time.append(temp_df_time)
except ValueError as e:
print(f"Error reading {file_path}: {e}")
print(f"TRY 20")
# Concatenate the DataFrames into one DataFrame
df_temp_time = pd.concat((df for df in df_list_time), ignore_index=True) #stuck here for 1
print(f"try4 {file_path}")
print(f"Appended {file_path} for user {user_id_dir}")
except ValueError as e:
print(f"Error reading {file_path}: {e}")
else:
print(f"Skipping empty file: {file_path}")
print(f"try5 {file_path}")
return df_temp_time
num_threads = 5
# Create a ThreadPoolExecutor with the desired number of threads
with ThreadPoolExecutor(max_workers=num_threads) as executor:
# Map the function to each file path and process concurrently
results = executor.map(append_to_dataframe, file_paths)
# Concatenate all resulting DataFrames
final_dfs_time = list(results)
total_df_time = pd.concat(final_dfs_time, ignore_index=True)
print("all added together")
我试图看看这个瓶颈是否与输入输出溢出有关,但看起来实际上将其转换为
线上的数据帧可能会带来更多的时间延迟temp_df_time = series.to_frame().T.rename(columns=column_mapping)
而不是
series = pd.Series(values, name=epoch_time)
但我不确定真正的原因
任何帮助都会非常感谢
在Python中,由于全局解释器锁(GIL),线程不会并行运行。您可以使用进程来代替使用线程。您可以使用多处理库而不是线程来创建进程,或者使用 ProcessPoolExecutor 而不是 ThreadPoolExecutor,其他 API 类似,例如使用 multiprocessing.Lock 而不是 threading.Lock。此外,不需要使用锁,因为您操作的是局部变量(对于堆栈中的每个进程都是唯一的)而不是全局变量。
这是使用多处理的代码版本。
注意:我正在考虑file_paths是json文件的唯一路径列表
final_dfs_time = []
def append_to_dataframe(file_path):
df_temp_time = None
if os.path.getsize(file_path) > 0:
try:
print(f"try 1 {file_path}")
# Read the JSON file using buffered I/O
with open(file_path, 'rb') as file:
buffered_file = io.BufferedReader(file)
json_list = json.load(buffered_file)
print(f"try 2 {file_path}")
# Initialize an empty list to store temporary DataFrames
df_list_time = []
print(f"try 10")
for entry in json_list: # Iterate over json_list, not json_data
try:
# No need to use lock here because of you is processing different json file for each process
for epoch_time, values in entry.items():
# Create a Series for each key-value pair
series = pd.Series(values, name=epoch_time)
# Convert the series into a DataFrame
temp_df_time = series.to_frame().T.rename(columns=column_mapping)
# Add 'time' column
# Convert epoch_time to a pandas Series with the same length as temp_df_time
temp_df_time['time'] = pd.Series(epoch_time, index=temp_df_time.index)
# Also no need to acquire lock here because you are appending in local variable instead of global
df_list_time.append(temp_df_time)
except ValueError as e:
print(f"Error reading {file_path}: {e}")
print(f"TRY 20")
# Concatenate the DataFrames into one DataFrame
df_temp_time = pd.concat((df for df in df_list_time), ignore_index=True) # stuck here for 1
print(f"try4 {file_path}")
print(f"Appended {file_path} for user {user_id_dir}")
except ValueError as e:
print(f"Error reading {file_path}: {e}")
else:
print(f"Skipping empty file: {file_path}")
print(f"try5 {file_path}")
return df_temp_time
num_process = os.cpu_count() - 1
# Create a ProcessPoolExecutor with the desired number of processes
with ProcessPoolExecutor(max_workers=num_process) as executor:
# Map the function to each file path and process concurrently
results = executor.map(append_to_dataframe, file_paths)
# Concatenate all resulting DataFrames
final_dfs_time = list(results)
total_df_time = pd.concat(final_dfs_time, ignore_index=True)
print("all added together")