对 Python 来说完全陌生。我想编写一个简单的异步函数,每 N 秒执行一次
ps $pid
直到收到停止信号。
即在 go 中它只是:
go func(cancelCtx context.Context) {
ticker, cancel := time.NewTicker(time.Second)
defer cancel()
looper:
for {
select {
case <-ticker.C:
checkProcess()
case <-ctx.Done():
break looper
}
}
fmt.Println("exited")
}(ctx)
我正在慢慢加强 python asyncio,但希望至少能获得一些关于最佳 pythonic 方法的一般指导。谢谢。
要同时运行任务,您可以使用 asyncio 模块和
asyncio.create_task()
来实现类似的功能。
import asyncio
import os
async def check_process(pid):
while True:
# Check if process is still running
if not os.path.exists(f"/proc/{pid}"):
print(f"Process with PID {pid} not found.")
break
else:
print(f"Process with PID {pid} found.")
# Wait for N seconds
await asyncio.sleep(5) # Change 5 to your desired interval
async def main():
pid = 12345 # Replace 12345 with your PID
task = asyncio.create_task(check_process(pid))
await task
if __name__ == "__main__":
asyncio.run(main())