pool.map() 不能在超过 2 个 CPU 上工作

问题描述 投票:0回答:0

我有以下代码:

import sentence_transformers
import multiprocessing
from tqdm import tqdm
from multiprocessing import Pool
import numpy as np

embedding_model = sentence_transformers.SentenceTransformer('sentence-transformers/all-mpnet-base-v2')

data = [[100227, 7382501.0, 'view', 30065006, False, ''],
 [100227, 7382501.0, 'view', 57072062, True, ''],
 [100227, 7382501.0, 'view', 66405922, True, ''],
 [100227, 7382501.0, 'view', 5221475, False, ''],
 [100227, 7382501.0, 'view', 63283995, True, '']]

df_text = dict()
df_text[7382501] = {'title': 'The Geography of the Internet Industry, Venture Capital, Dot-coms, and Local Knowledge - MATTHEW A. ZOOK', 'abstract': '23', 'highlight': '12'}
df_text[30065006] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[57072062] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[66405922] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[5221475] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}
df_text[63283995] = {'title': 'Determination of the Effect of Lipophilicity on the in vitro Permeability and Tissue Reservoir Characteristics of Topically Applied Solutes in Human Skin Layers', 'abstract': '12', 'highlight': '12'}


# Define the function to be executed in parallel
def process_data(chunk):
    results = []
    for row in chunk:
        print(row[0])
        work_id = row[1]
        mentioning_work_id = row[3]
        print(work_id)

        if work_id in df_text and mentioning_work_id in df_text:
            title1 = df_text[work_id]['title']
            title2 = df_text[mentioning_work_id]['title']
            embeddings_title1 = embedding_model.encode(title1,convert_to_numpy=True)
            embeddings_title2 = embedding_model.encode(title2,convert_to_numpy=True)
            
            similarity = np.matmul(embeddings_title1, embeddings_title2.T)
            
            results.append([row[0],row[1],row[2],row[3],row[4],similarity])
        else:
            continue
    return results

# Define the number of CPU cores to use
num_cores = multiprocessing.cpu_count()

# Split the data into chunks
chunk_size = len(data) // num_cores
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

# Create a pool of worker processest
pool = multiprocessing.Pool(processes=num_cores)

results = []
with tqdm(total=len(data)) as pbar:
    for i, result_chunk in enumerate(pool.map(process_data, chunks)):
        # Update the progress bar
        pbar.update()
        # Add the results to the list
        results += result_chunk

# Concatenate the results
final_result = results

我在

Amazon Sagemaker
上运行此代码,它在具有 2 个 CPU 的实例上运行得很好。它给了我进度条和一切。但我想在具有更多 CPU 的更大实例上运行它。但它只是挂起更多的 CPU,根本没有进步。当我最终停止内核时,我得到这个错误:

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-18-19449c86abd3> in <module>
      1 results = []
      2 with tqdm(total=len(chunks)) as pbar:
----> 3     for i, result_chunk in enumerate(pool.map(process_data, chunks)):
      4         # Update the progress bar
      5         pbar.update()

/opt/conda/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    266         in a list that is returned.
    267         '''
--> 268         return self._map_async(func, iterable, mapstar, chunksize).get()
    269 
    270     def starmap(self, func, iterable, chunksize=None):

/opt/conda/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    649 
    650     def get(self, timeout=None):
--> 651         self.wait(timeout)
    652         if not self.ready():
    653             raise TimeoutError

/opt/conda/lib/python3.7/multiprocessing/pool.py in wait(self, timeout)
    646 
    647     def wait(self, timeout=None):
--> 648         self._event.wait(timeout)
    649 
    650     def get(self, timeout=None):

/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
    550             signaled = self._flag
    551             if not signaled:
--> 552                 signaled = self._cond.wait(timeout)
    553             return signaled
    554 

/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
    294         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    295             if timeout is None:
--> 296                 waiter.acquire()
    297                 gotit = True
    298             else:

KeyboardInterrupt: 

这让我相信它正在等待资源?没有把握。在这方面的任何帮助将不胜感激。 此外,当我运行此代码时,我看到在

cores
文件资源管理器中创建了很多
Sagemaker

python python-3.x multithreading multiprocessing mapreduce
© www.soinside.com 2019 - 2024. All rights reserved.