创建并调用一个在循环中“异步”更新文件的函数,直到并行启动的第二个函数完成

问题描述 投票:0回答:1

我是 Python 中的多处理/线程和异步的新手,我想并行两个函数调用,以便第一个函数在 5 分钟内无限循环中更新运行状况检查文本文件。直到第二个功能完成为止。之后第一个函数调用的循环应该停止。

这是一个尝试,但不起作用:

import multiprocessing
import time

done_event = multiprocessing.Event()

# Function to update the healthcheck text file
def update_healthcheck():
    interval = 5 * 60  # 5 minutes interval in seconds
    while not done_event.is_set():
        with open("healthcheck.txt", "w") as f:
            f.write("Health is okay.")
        time.sleep(interval)

# Function that simulates the second task
def second_task():
    time.sleep(20)  # Simulating some work
    done_event.set()  # Set the event to signal the first function to stop

if __name__ == "__main__":
    # Start the first function in a separate process
    healthcheck_process = multiprocessing.Process(target=update_healthcheck)
    healthcheck_process.start()
    
    # Start the second function in the main process
    second_task()
    
    # Wait for the healthcheck process to finish
    healthcheck_process.join()
    
    print("Both tasks completed.")

该代码片段的正确且更好的实现是什么?

谢谢!

python loops python-asyncio python-multiprocessing python-multithreading
1个回答
0
投票

是的 - 就是这个想法 -

只需将“多处理”改为“线程”即可。您没有详细说明那里“不起作用”的内容 - 但这是因为以这种方式编写的代码在子流程中创建了一个独立的

Event
实例 - 很可能只是将
done_event
作为参数传递给
 update_healthcheck
就可以了。

但是在这种情况下没有理由使用多重处理 - 它会在共享对象上带来一些边缘情况,就像您遇到的那样。另一方面,如果您的主任务有可能失败并且程序停止运行,“健康检查”子进程将随之消失:如果需要进程独立性,您应该将健康检查保留在主进程中,并且将主要任务委托给子流程,并可能向其传递一个

multiprocessing.Queue
,以便它可以发布通知主流程它仍然存在。

否则,如果这样的检查是多余的,并且您只想在任务完成时进行注释,那么只要将其更改为使用线程,您编写的代码就可以工作:

import threading
import time, datetime

done_event = threading.Event()

# Function to update the healthcheck text file
def update_healthcheck():
    interval = 5 * 60  # 5 minutes interval in seconds
    while not done_event.is_set():
        # append timestamped logs:
        with open("healthcheck.txt", "at") as f:
            f.write(f"{datetime.datetime.now()} - Health is okay. ")
        time.sleep(interval)

# Function that simulates the second task
def second_task():
    time.sleep(20)  # Simulating some work
    done_event.set()  # Set the event to signal the first function to stop


def main():
    # bring code inside functions for better
    # readability/maintaninability
    
    # Start the first function in a separate process
    healthcheck_process = threading.Thread(target=update_healthcheck)
    healthcheck_process.start()
    
    # Start the second function in the main thread
    second_task()
    
    # Wait for the healthcheck process to finish
    healthcheck_process.join()
    
    print("Both tasks completed.")
    
if __name__ == "__main__":
    # the guarded code should ideally be short -
    # by moving the logic to a function, we need a single line:
    main()

© www.soinside.com 2019 - 2024. All rights reserved.