是否有Python库或将在分钟或一个小时执行的解决方案?

问题描述 投票:0回答:3

我的目标是在不同的计算机上运行程序并从测量中获取数据。问题在于这些测量需要同时(尽可能多)进行。我当前的方法是安装crony来同步所有机器的时间,并启动一个python程序,该程序将在每分钟开始时进行测量。事实证明这很麻烦,因为我必须循环轮询time.time()并检查是否输入了新的分钟。

time_factor = 1   # 1000 if desired resolution is ms, 1 if seconds
interval_s = 60 * time_factor
old_interval = int(time.time()*time_factor) / interval_s
current_interval = None
running = True
while running:
    t_s = int(time.time() * time_factor)
    current_interval = t_s / interval_s

    if (t_s % interval_s == 0) and (current_interval != old_interval):
        request_runtime = time.time()

        await self.do_time_sensitive_work()

        old_interval = current_interval
        request_runtime = time.time() - request_runtime
        await asyncio.sleep(int(interval_s - request_runtime - 1))
    else:
        await asyncio.sleep(0.01)

是否有针对此类问题的标准解决方案,或者至少是更优雅的解决方案?

python time runtime python-asyncio intervals
3个回答
0
投票

也许这就是您想要的

What is the best way to repeatedly execute a function every x seconds in Python?

无法发表评论,因为我没有T.T的声誉


0
投票

尝试一下:

import asyncio
import time
from datetime import datetime


async def task(call_time):
    print(f"task started at: {to_string(call_time)}")
    await asyncio.sleep(5)
    print(f"task finished at: {to_string((time.time()))}")


def to_string(t):
    return datetime.utcfromtimestamp(t).strftime('%Y-%m-%d %H:%M:%S')


def call_task(loop, task, *args):
    print("task creating")
    loop.create_task(task(*args))
    print("task created")


async def main(loop: asyncio.AbstractEventLoop):
    interval = 10
    t = time.time()
    while True:
        print(f"Now is {to_string(t)}")
        call_time = estimate_next_call(t, interval)
        print(f"Call time {to_string(call_time)}")
        await asyncio.sleep(call_time - t)
        call_task(loop, task, call_time)
        t += interval
        await asyncio.sleep(interval)


def estimate_next_call(t, interval):
    return ((t + interval) // interval) * interval


if __name__ == "__main__":
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main(event_loop))

0
投票

我尝试使用apscheduler库,因为它似乎实现了cron表达式,但是我发现这种方法有2个缺点。一种可能是该库非常怪异地与asyncio代码集成,这一定是我实现的错误。第二个问题是,排定的任务实际上要花费几毫秒才能真正开始运行,这是一个小问题,但是手动排定可以避免这种滞后,并且我想获得尽可能接近“精确时间”的测量值]

[我也尝试过使用celery beat,但这似乎是一个主要的矫over过正,必须安装Redis或其他内容作为消息代理,并且将如此复杂的框架用于这样的次要任务只会大喊“错误”。

当前运行速度足够快的解决方案依赖于使用“ time_factor”修改时间检查的分辨率,以秒为单位修改time.time()值。解决方案的主要部分是将当前时间除以间隔,以获取历史记录中的亲切数字,如果该数字已更改,我们只需输入新的间隔:

time_factor = 100  # 1000 for milliseconds
interval_size = 60 * time_factor
sleep_time = 1 / (time_factor * 100)
old_interval = int(int(time.time()*time_factor) / interval_size)
current_timestamp = time.time()
current_interval = None

running = True
while running:
    current_timestamp = int(time.time() * time_factor)
    current_interval = int(t_s / interval_size)
    if (current_interval != old_interval):
        request_runtime = time.time()

        await self.do_time_sensitive_work()

        old_interval = current_interval
        request_runtime = time.time() - request_runtime
        await asyncio.sleep(1/(time_factor*100) - request_runtime )
    else:
        await asyncio.sleep(1/(time_factor*100))
© www.soinside.com 2019 - 2024. All rights reserved.