避免临界区 |在没有全局锁的情况下写入文件的安全方法

问题描述 投票:0回答:0

我的脚本执行 ~20000 个异步请求,然后处理响应。我想通过使用

multiprocessing
来加快处理响应的过程。来自响应的数据保存在约 3000 个 json 文件中,并且这个数字还在增长(如果数据与同一类别相关,但文件大小已经太大)。我试图通过创建服务器进程管理器对象来解决它,例如
dict
,其中键是文件路径,默认情况下值为
True
。在
handle_results()
中,我正在检查文件路径的状态以确定是否有任何其他进程开始使用它,但是当两个进程在同时... [2023-02-21 21:26:06,868]:[INFO]:Process i: 9 | locking: product_data/Connectors, Interconnects;Card Edge Connectors;Edgeboard Connectors_3.json | state: False [2023-02-21 21:26:06,868]:[INFO]:Process i: 8 | locking: product_data/Connectors, Interconnects;Card Edge Connectors;Edgeboard Connectors_3.json | state: False

换句话说,不同的进程在同一时间改变一个对象,并且都认为他们在这种状态下是孤立的。我怎么知道那叫
critical section

。所以我正在写它,因为我真的确定使用

Manager
是处理这个问题的一种方法。
我将分享使用

multiprocessing

的部分代码:

def handle_results(
        process_index: int,
        queue: mp.Queue,
        files_state: dict,
        results: list,
        urls: list[str]
) -> None:

    collected_urls = set()
    for url, result in zip(urls, results):
        if isinstance(result, str):
            try:
                page_data = collect_page_data(result, url)
                is_new_file, filepath = get_filepath(page_data['attributes']['Category'], FOLDER_PRODUCT_DATA)

                if is_new_file and files_state.get(str(filepath)) is None:
                    files_state[str(filepath)] = True

                while True:
                    if not files_state[str(filepath)]:
                        logging.info(f'Process i: {process_index} | {str(filepath)} ALREADY LOCKED! |'
                                     f' state: {files_state[str(filepath)]}')
                        time.sleep(0.1)
                        continue

                    files_state[str(filepath)] = False
                    logging.info(f'Process i: {process_index} | locking: {str(filepath)} |'
                                 f' state: {files_state[str(filepath)]}')
                    append_to_json(filepath, page_data, json_type=dict)
                    files_state[str(filepath)] = True
                    logging.info(f'Process i: {process_index} | unlocking: {str(filepath)} |'
                                 f' state: {files_state[str(filepath)]}')
                    break

                collected_urls.add(url)
            except RequestDenied:
                pass
            except Exception as ex:
                logging.error(ex, exc_info=True)
    queue.put(collected_urls)


async def multihandling_results(
        results: tuple,
        urls: list[str],
        count_processes: int = mp.cpu_count()
) -> set[str]:
    """ Handle results with multiprocessing """
    
    ### probably not interesting part 
    filtered_results = []
    filtered_urls = []

    for res, url in zip(results, urls):
        if isinstance(res, str):
            filtered_results.append(res)
            filtered_urls.append(url)

    if not filtered_results:
        return set()
    elif len(filtered_results) <= count_processes:
        max_to_task = count_processes
    else:
        max_to_task = len(filtered_results) // count_processes

    divided_results = [
        [filtered_results[i:i+max_to_task], filtered_urls[i:i+max_to_task]]
        for i in range(0, len(filtered_results), max_to_task)
    ]
    ### probably not interesting part 

    queue = mp.Queue()
    processes = []

    with mp.Manager() as manager:
        files_state = manager.dict()
        for filepath in Path(FOLDER_PRODUCT_DATA).iterdir():
            files_state[str(filepath)] = True

        for process_index, process_args in enumerate(divided_results):
            process = mp.Process(
                target=handle_results,
                args=[process_index, queue, files_state] + process_args
            )
            process.start()
            processes.append(process)

        for process in processes:
            process.join()

        collected_urls = set()
        while not queue.empty():
            collected_urls.update(queue.get())
    return collected_urls

来自
docs

如前所述,在进行并发编程时通常是 最好尽可能避免使用共享状态。这是 使用多个进程时尤其如此。

但我还是不确定……这会让我的目标无法到达吗?请任何解决这个问题的建议

python python-3.x multiprocessing race-condition critical-section
© www.soinside.com 2019 - 2024. All rights reserved.