IndexError:在 Python 中使用 map 时从空双端队列中弹出

问题描述 投票:0回答:1

我有以下代码,我试图在其中并行计算句子嵌入。

import multiprocessing
from tqdm import tqdm

# Define the function to be executed in parallel
def process_data(chunk):
    results = []
    for row in tqdm(chunk):
        work_id = row[1]
        mentioning_work_id = row[3]

        if work_id in df_text and mentioning_work_id in df_text:
            title1 = df_text[work_id]['title']
            title2 = df_text[mentioning_work_id]['title']
            print(title1 + '/n' + title2)
            embeddings_title1 = embedding_model.encode(title1,convert_to_numpy=True)
            print(embeddings_title1)
            embeddings_title2 = embedding_model.encode(title2,convert_to_numpy=True)
            print(embeddings_title2)
            results.append(np.matmul(embeddings_title1, embeddings_title2.T))
            print(results)
        else:
            continue
    return results

from multiprocessing import Pool

# Define the data to be processed
data = df_rud_labels

# Split the data into chunks
chunk_size = len(data) // num_cores
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

# Create a pool of worker processes
pool = multiprocessing.Pool(processes=num_cores)

results = []
with tqdm(total=len(chunks)) as pbar:
    for i, result_chunk in enumerate(pool.imap_unordered(process_data, chunks)):
        # Update the progress bar
        pbar.update()
        # Add the results to the list
        results += result_chunk

# Concatenate the results
final_result = results

但是我在中断内核后得到如下错误:

  0%|          | 0/2500 [00:00<?, ?it/s]
Financialization and Institutional Change in Capitalisms: A Comparison of the US and Germany/nSingle domain antibodies: promising experimental and therapeutic tools in infection and immunity
  0%|          | 0/2500 [00:00<?, ?it/s]
Encyclopedia of India-China Cultural Contacts, vol I/nToll-like receptors as a key regulator of mesenchymal stem cell function: An up-to-date review
  0%|          | 0/2500 [00:00<?, ?it/s]
Ioannis ROMANIDES, Dogmatica patristica ortodoxa, traducere de Dragos Dasca, Editura Ecclesiast, editie de protos Vasile Bîrzu, 2011/nANTHROPOMETRIC MEASUREMENTS, SOMATOTYPES AND PHYSICAL ABILITIES AS A FUNCTION TO PREDICT THE SELECTION OF TALENTS JUNIOR WEIGHTLIFTERS 
A prophet of old: Jesus the “public theologian”/nCurriculum alignment at undergraduate level: military geography at the South African Military Academy
  0%|          | 0/4 [00:09<?, ?it/s]Process ForkPoolWorker-72:
Process ForkPoolWorker-71:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/opt/conda/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/opt/conda/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/opt/conda/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/opt/conda/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/opt/conda/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
/opt/conda/lib/python3.7/multiprocessing/pool.py in next(self, timeout)
    732             try:
--> 733                 item = self._items.popleft()
    734             except IndexError:

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-48-fcb9ab74a032> in <module>
     31 results = []
     32 with tqdm(total=len(chunks)) as pbar:
---> 33     for i, result_chunk in enumerate(pool.imap_unordered(process_data, chunks)):
     34         # Update the progress bar
     35         pbar.update()

/opt/conda/lib/python3.7/multiprocessing/pool.py in next(self, timeout)
    735                 if self._index == self._length:
    736                     raise StopIteration from None
--> 737                 self._cond.wait(timeout)
    738                 try:
    739                     item = self._items.popleft()

/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
    294         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    295             if timeout is None:
--> 296                 waiter.acquire()
    297                 gotit = True
    298             else:

KeyboardInterrupt: 

我尝试在输出中使用相同的标题来计算这些嵌入,我已经成功了。但是我不确定为什么我写的代码不能做到这一点。

任何帮助将不胜感激。

python machine-learning nlp multiprocessing mapreduce
1个回答
0
投票

我不得不使用

pool.map()
。解决了它。

© www.soinside.com 2019 - 2024. All rights reserved.