python多线程,即使使用多个线程也运行相同的速度

问题描述 投票:0回答:1

代码工作得很好,但是当使用多个线程时,我希望它运行得更快,例如,如果使用 2 个线程,它应该运行得快一倍,但完成时间却是相同的。

瓶颈位于“TRY 10”和“TRY 20”之间。这就是即使使用更多线程,它也需要相同的速度。

代码如下

final_dfs_time = []

# Initialize a lock for synchronizing access to df_list_time
lock = threading.Lock()
json_list_lock = threading.Lock()

def append_to_dataframe(file_path):
    df_temp_time = None
    if os.path.getsize(file_path) > 0:
        try:
            print(f"try 1 {file_path}")
            # Read the JSON file using buffered I/O
            with open(file_path, 'rb') as file:
                buffered_file = io.BufferedReader(file)
                json_list = json.load(buffered_file)
                print(f"try 2 {file_path}")

            # Initialize an empty list to store temporary DataFrames
            df_list_time = []

            print(f"try 10")


            for entry in json_list:  # Iterate over json_list, not json_data
                try:
                    with json_list_lock:
                        for epoch_time, values in entry.items():
                            # Create a Series for each key-value pair
                            
                            series = pd.Series(values, name=epoch_time)
                            
                            # Convert the series into a DataFrame
                            temp_df_time = series.to_frame().T.rename(columns=column_mapping)


                            
                            # Add 'time' column
                            # Convert epoch_time to a pandas Series with the same length as temp_df_time
                            epoch_time_series = pd.Series(epoch_time, index=temp_df_time.index)
                            # Assign the Series to the 'time' column directly
                            
                            temp_df_time['time'] = epoch_time_series
                            
                            # Acquire the lock before appending to df_list_time
                            with lock:
                                # print(f"LOCK {file_path}")
                                # Append the DataFrame to the list
                                
                                df_list_time.append(temp_df_time)
                                

                except ValueError as e:
                  print(f"Error reading {file_path}: {e}")



            print(f"TRY 20")


            # Concatenate the DataFrames into one DataFrame
            df_temp_time = pd.concat((df for df in df_list_time), ignore_index=True) #stuck here for 1
            print(f"try4 {file_path}")
            print(f"Appended {file_path} for user {user_id_dir}")

        except ValueError as e:
            print(f"Error reading {file_path}: {e}")

    else:
        print(f"Skipping empty file: {file_path}")

    print(f"try5 {file_path}")
    return df_temp_time


num_threads = 5
# Create a ThreadPoolExecutor with the desired number of threads
with ThreadPoolExecutor(max_workers=num_threads) as executor:
    # Map the function to each file path and process concurrently
    results = executor.map(append_to_dataframe, file_paths)

# Concatenate all resulting DataFrames
final_dfs_time = list(results)

total_df_time = pd.concat(final_dfs_time, ignore_index=True)
print("all added together")

我试图看看这个瓶颈是否与输入输出溢出有关,但看起来实际上将其转换为

线上的数据帧可能会带来更多的时间延迟

temp_df_time = series.to_frame().T.rename(columns=column_mapping)

而不是

series = pd.Series(values, name=epoch_time)

但我不确定真正的原因

任何帮助都会非常感谢

python multithreading
1个回答
0
投票

在Python中,由于全局解释器锁(GIL),线程不会并行运行。您可以使用进程来代替使用线程。您可以使用多处理库而不是线程来创建进程,或者使用 ProcessPoolExecutor 而不是 ThreadPoolExecutor,其他 API 类似,例如使用 multiprocessing.Lock 而不是 threading.Lock。此外,不需要使用锁,因为您操作的是局部变量(对于堆栈中的每个进程都是唯一的)而不是全局变量。

这是使用多处理的代码版本。

注意:我正在考虑file_paths是json文件的唯一路径列表

final_dfs_time = []
def append_to_dataframe(file_path):
    df_temp_time = None
    if os.path.getsize(file_path) > 0:
        try:
            print(f"try 1 {file_path}")
            # Read the JSON file using buffered I/O
            with open(file_path, 'rb') as file:
                buffered_file = io.BufferedReader(file)
                json_list = json.load(buffered_file)
                print(f"try 2 {file_path}")

            # Initialize an empty list to store temporary DataFrames
            df_list_time = []

            print(f"try 10")

            for entry in json_list:  # Iterate over json_list, not json_data
                try:
                    # No need to use lock here because of you is processing different json file for each process
                    for epoch_time, values in entry.items():
                        # Create a Series for each key-value pair

                        series = pd.Series(values, name=epoch_time)

                        # Convert the series into a DataFrame
                        temp_df_time = series.to_frame().T.rename(columns=column_mapping)

                        # Add 'time' column
                        # Convert epoch_time to a pandas Series with the same length as temp_df_time
                        temp_df_time['time'] = pd.Series(epoch_time, index=temp_df_time.index)

                        # Also no need to acquire lock here because you are appending in local variable instead of global
                        df_list_time.append(temp_df_time)

                except ValueError as e:
                    print(f"Error reading {file_path}: {e}")

            print(f"TRY 20")

            # Concatenate the DataFrames into one DataFrame
            df_temp_time = pd.concat((df for df in df_list_time), ignore_index=True)  # stuck here for 1
            print(f"try4 {file_path}")
            print(f"Appended {file_path} for user {user_id_dir}")

        except ValueError as e:
            print(f"Error reading {file_path}: {e}")

    else:
        print(f"Skipping empty file: {file_path}")

    print(f"try5 {file_path}")
    return df_temp_time


num_process = os.cpu_count() - 1
# Create a ProcessPoolExecutor with the desired number of processes
with ProcessPoolExecutor(max_workers=num_process) as executor:
    # Map the function to each file path and process concurrently
    results = executor.map(append_to_dataframe, file_paths)

# Concatenate all resulting DataFrames
final_dfs_time = list(results)

total_df_time = pd.concat(final_dfs_time, ignore_index=True)
print("all added together")
© www.soinside.com 2019 - 2024. All rights reserved.