并行化、多重处理、CSV 编写器

问题描述 投票:0回答:1

我有一个名为

term_list
的巨大字符串列表,我在名为
run_mappers()
的函数中逐一处理它们。其中一个参数是一个
csv_writer
对象。我将结果附加到函数中名为
from_mapper
的列表中。我使用
csv_writer
对象将该列表写入 csv 文件。在我寻求帮助时,我读到不建议将多处理模块用于 csv 写入,因为它会腌制并且 csv_writer 对象无法腌制(现在在桌面上打开的十亿个选项卡中找不到此参考)。无论如何,我不确定多重处理是否最适合我的任务。

def run_mappers(individual_string, other_args, csv_writer):
   # long processing code goes here, ending up with processed_result 
   from_mapper.append(processed_result)
   csv_writer.writerow(processed_result)

我想加快这个巨大列表的处理速度,但我试图通过将列表分成批次来处理(term_list_batch)来控制内存使用。所以我尝试:

def parallelize_mappers(term_list_batch, other_args, csv_writer):
    
    future_to_term = {}
    terms_left = len(term_list_batch)

    with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
        future_to_term = {executor.submit(run_mappers, term_list_batch, other_args, csv_writer): term for term in term_list_batch}
        try:
            for future in concurrent.futures.as_completed(future_to_term, timeout=180): # timeout after 3 min
                term = future_to_term[future]
                try:
                    result = future.result()
                    # Process result if needed
                except Exception as exc:
                    print(f"Job {term} generated an exception: {exc}")
                finally:
                    terms_left -= 1
                    if terms_left % 10 == 0:
                        gc.collect()
                        time.sleep(2)
        except concurrent.futures.TimeoutError:
            print("Timeout occurred while processing futures")
            for key, future in future_to_term.items():
                if key not in results:
                    future.cancel()

当我收到超时错误时,我的进程就会挂起,我不知道该怎么做才能在我庞大的 term_list 中继续前进。我也不想终止该程序。我只想继续浏览 term_list,或处理下一批。如果线程失败或发生其他情况,我只想忽略该术语或扔掉整个线程并继续处理 term_list 以将尽可能多的结果写入文件。

在我多次尝试解决问题时,我尝试了类似this的方法,但我将上面的方法发布为我最好的方法,因为它在拖延我之前花了几百个术语。我的其他尝试刚刚失败,有一些运行时错误,有线程死锁,等等。

作为参考,另一种尝试如下:

def parallelize_mappers(term_list_batch, other_args, csv_writer):
    
    timeout = 120
    terminate_flag = threading.Event()

    # Create a thread for each term
    threads = []
    for term in term_list_batch:
        thread = threading.Thread(target=run_mappers, args=(term, other_args, csv_writer, terminate_flag))
        threads.append(thread)
        thread.start()

    # Wait for all threads to complete or timeout
    for thread in threads:
        thread.join(timeout)

        # If the thread is still alive, it has timed out
        if thread.is_alive():
            print("Thread {} timed out. Terminating...".format(thread.name))
            terminate_flag.set()  # Set the flag to terminate the thread

然后,在执行其余处理代码之前,我向

while not terminate_flag.is_set()
函数添加了
run_mappers()
。但这实在是太慢了。预先感谢您。

模拟代码重现/term_list进行处理如下:

term_list = ['Dementia',
 'HER2-positive Breast Cancer',
 'Stroke',
 'Hemiplegia',
 'Type 1 Diabetes',
 'Hypospadias',
 'IBD',
 'Eating',
 'Gastric Cancer',
 'Lung Cancer',
 'Carcinoid',
 'Lymphoma',
 'Psoriasis',
 'Fallopian Tube Cancer',
 'Endstage Renal Disease',
 'Healthy',
 'HRV',
 'Recurrent Small Lymphocytic Lymphoma',
 'Gastric Cancer Stage III',
 'Amputations',
 'Asthma',
 'Lymphoma',
 'Neuroblastoma',
 'Breast Cancer',
 'Healthy',
 'Asthma',
 'Carcinoma, Breast',
 'Fractures',
 'Psoriatic Arthritis',
 'ALS',
 'HIV',
 'Carcinoma of Unknown Primary',
 'Asthma',
 'Obesity',
 'Anxiety',
 'Myeloma',
 'Obesity',
 'Asthma',
 'Nursing',
 'Denture, Partial, Removable',
 'Dental Prosthesis Retention',
 'Obesity',
 'Ventricular Tachycardia',
 'Panic Disorder',
 'Schizophrenia',
 'Pain',
 'Smallpox',
 'Trauma',
 'Proteinuria',
 'Head and Neck Cancer',
 'C14',
 'Delirium',
 'Paraplegia',
 'Sarcoma',
 'Favism',
 'Cerebral Palsy',
 'Pain',
 'Signs and Symptoms, Digestive',
 'Cancer',
 'Obesity',
 'FHD',
 'Asthma',
 'Bipolar Disorder',
 'Healthy',
 'Ayerza Syndrome',
 'Obesity',
 'Healthy',
 'Focal Dystonia',
 'Colonoscopy',
 'ART',
 'Interstitial Lung Disease',
 'Schistosoma Mansoni',
 'IBD',
 'AIDS',
 'COVID-19',
 'Vaccines',
 'Beliefs',
 'SAH',
 'Gastroenteritis Escherichia Coli',
 'Immunisation',
 'Body Weight',
 'Nonalcoholic Steatohepatitis',
 'Nonalcoholic Fatty Liver Disease',
 'Prostate Cancer',
 'Covid19',
 'Sarcoma',
 'Stroke',
 'Liver Diseases',
 'Stage IV Prostate Cancer',
 'Measles',
 'Caregiver Burden',
 'Adherence, Treatment',
 'Fracture of Distal End of Radius',
 'Upper Limb Fracture',
 'Smallpox',
 'Sepsis',
 'Gonorrhea',
 'Respiratory Syncytial Virus Infections',
 'HPV',
 'Actinic Keratosis']
python csv batch-processing concurrent.futures timeoutexception
1个回答
0
投票

在我看来,你想要并行或多任务

run_mappers()
,因为这个功能可能需要很长时间才能完成。 CSV 写入部分不需要并行运行,因为它完成得相对较快。

第一步是重新设计

run_mappers()
NOT 以接受 CSV writer 作为参数。相反,这个函数应该返回
processed_result
。该函数可能会引发异常,我们将忽略该线程的结果。为了有用,我会将错误写到
err.txt

import csv
import logging
import random
import time
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s | %(levelname)s | %(message)s",
)

term_list = [
    "Dementia",
    # ... omitted for brevity
    "Actinic Keratosis",
]


def run_mappers(individual_string, other_args):
    # Simulate long processing time to get processed_result
    time.sleep(random.randint(1, 2))
    processed_result = [individual_string.strip(), other_args]

    # Simulate an exception
    if random.randint(1, 20) == 5:
        logging.error("%r -> failed", individual_string)
        raise (ValueError(individual_string))

    logging.debug("%r -> %r", individual_string, processed_result)
    return processed_result


def main():
    """Entry"""
    # run_mappers takes a long time, so this part is done in parallel
    with ThreadPoolExecutor() as executor:
        futures = [
            executor.submit(run_mappers, term, "other-args") for term in term_list
        ]

    # Writing to CSV does not need to be done in parallel because
    # it is relatively quick
    logging.info("Writing to CSV")
    with open("out.csv", "w") as stream, open("err.txt", "w") as err:
        writer = csv.writer(stream)
        for future in futures:
            if future.exception():
                err.write(f"{future.exception()}\n")
            else:
                writer.writerow(future.result())
    logging.info("Done CSV")


if __name__ == "__main__":
    main()

输出

2024-03-02 09:49:00,335 | DEBUG | 'HER2-positive Breast Cancer' -> ['HER2-positive Breast Cancer', 'other-args']
2024-03-02 09:57:55,174 | ERROR | 'Breast Cancer' -> failed
2024-03-02 09:49:11,366 | DEBUG | 'HPV' -> ['HPV', 'other-args']
...
2024-03-02 09:49:11,377 | DEBUG | 'Sepsis' -> ['Sepsis', 'other-args']
2024-03-02 09:49:11,377 | INFO | Writing to CSV
2024-03-02 09:49:11,378 | INFO | Done CSV

注释

  • 运行此脚本,如果结果看起来不错,您可以添加真正的
    run_mappers()
    代码
  • 我不知道
    other_args
    是什么样子,所以我假装它
  • 您可能需要将
    ThreadPoolExecutor
    替换为
    ProcessPoolExecutor
    并比较时间,看看哪种解决方案更有效
© www.soinside.com 2019 - 2024. All rights reserved.