多处理中的多变量与一个公共

问题描述 投票:0回答:1

我有一个 python 脚本和一个需要使用多处理执行的函数,但是,该函数的参数之一 (df) 对于每次调用都是相同的。让我用代码来解释一下:

def main(): 
    for country in all_countries:
        df = SqlManager().sql_query('MyDb', "SELECT * FROM MyTable WHERE Country=''")
        args = [(
            df,
            lvl,
            timescale,
            adm_code
        ) for lvl in ['Country', 'Region', 'County']
            for timescale in ['month', 'week']
            for adm_code in list(df[lvl].unique())
        ]
        pool = Pool(cpu_count())
        entry_list = list(tqdm(pool.imap(parallel_func, args), total=len(args))
     

哪里

def parallel_func(args):
    return function_to_run(*args)

def function_to_run(df, lvl, timescale, adm_code):
    """my code"""

问题是我的 df 非常大(超过 5000 万行),通过这个过程,我将它存储在 args 中 10000 次,而它在任何地方都完全相同 我也不想对我的并行函数进行 SQL 调用,因为对于相同的数据多次调用 SQL 会失去内存中的内存

我尝试使用全局变量,但是当我在 // 中工作时,它们没有共享相同的变量。

python sql multiprocessing arguments
1个回答
0
投票

以下答案基于本文中描述的使 Pandas 数据帧可跨多个进程共享的方法。主要类是SharedPandasDataFrame

,使用方式如下:

# A sharable instance based on a Numpy array stored in shared memory: shared_df = SharedPandasDataFrame(df) # sharable instance # Reconstitute the dataframe from the sharable instance: df = shared_df.read()
它是必须传递给进程的 

shared_df

 实例,然后该进程可以使用其 
read
 方法从中重建数据帧。

from multiprocessing.shared_memory import SharedMemory import multiprocessing as mp import numpy as np import pandas as pd from tqdm import tqdm class SharedNumpyArray: ''' Wraps a numpy array so that it can be shared quickly among processes, avoiding unnecessary copying and (de)serializing. ''' def __init__(self, array): ''' Creates the shared memory and copies the array therein ''' # create the shared memory location of the same size of the array self._shared = SharedMemory(create=True, size=array.nbytes) # save data type and shape, necessary to read the data correctly self._dtype, self._shape = array.dtype, array.shape # create a new numpy array that uses the shared memory we created. # at first, it is filled with zeros res = np.ndarray( self._shape, dtype=self._dtype, buffer=self._shared.buf ) # copy data from the array to the shared memory. numpy will # take care of copying everything in the correct format res[:] = array[:] def read(self): ''' Reads the array from the shared memory without unnecessary copying. ''' # simply create an array of the correct shape and type, # using the shared memory location we created earlier return np.ndarray(self._shape, self._dtype, buffer=self._shared.buf) def copy(self): ''' Returns a new copy of the array stored in shared memory. ''' return np.copy(self.read_array()) def unlink(self): ''' Releases the allocated memory. Call when finished using the data, or when the data was copied somewhere else. ''' self._shared.close() self._shared.unlink() class SharedPandasDataFrame: ''' Wraps a pandas dataframe so that it can be shared quickly among processes, avoiding unnecessary copying and (de)serializing. ''' def __init__(self, df): ''' Creates the shared memory and copies the dataframe therein ''' self._values = SharedNumpyArray(df.values) self._index = df.index self._columns = df.columns def read(self): ''' Reads the dataframe from the shared memory without unnecessary copying. ''' return pd.DataFrame( self._values.read(), index=self._index, columns=self._columns ) def copy(self): ''' Returns a new copy of the dataframe stored in shared memory. ''' return pd.DataFrame( self._values.copy(), index=self._index, columns=self._columns ) def unlink(self): ''' Releases the allocated memory. Call when finished using the data, or when the data was copied somewhere else. ''' self._values.unlink() def parallel_func(tpl): # Unpack: shared_df, lvl, timescale, adm_code = tpl # reconstitute the dataframe: df = shared_df.read() ... def main(): for country in all_countries: df = SqlManager().sql_query('MyDb', "SELECT * FROM MyTable WHERE Country=''") shared_df = SharedPandasDataFrame(df) args = [( shared_df, lvl, timescale, adm_code ) for lvl in ['Country', 'Region', 'County'] for timescale in ['month', 'week'] for adm_code in list(df[lvl].unique()) ] # If you need to access the sharable dataframe, uncomment out the following # df = shared_df.read() with mp.Pool() as pool: # Add missing ) entry_list = list(tqdm(pool.imap(parallel_func, args), total=len(args))) # Destroy shared memory rendering shared_df unusable: shared_df.unlink() if __name__ == '__main__': main()
    
© www.soinside.com 2019 - 2024. All rights reserved.