我有多个客户端作为服务器、一个调度程序和一个具有 3 个线程的工作线程。 我的客户端是异步的,当我收到请求时,他们使用分布式客户端。 通话看起来像这样:
processing_futures = await client.gather(client.compute(response, priority=100,
resources={"GPU_RAM": 5}, #the worker has 16 gpu_ram
fifo_timeout='0ms'))
嗯,问题是,当我收到很多任务(比如超过 1k)时,worker 实际上会尝试首先处理所有初始任务(如here所述),这会导致worker崩溃,而不是工作程序可以通过简单的 fifo 进行处理,并且它会工作得很好,问题是工作程序试图将所有中间结果存储在其内存中,这太多了。这就像令人尴尬的并行问题,而 dask 似乎因此受到了很大的挑战。
我尝试了很多东西,如何让它一一处理任务?或者至少是 10 x 10,但不要试图同时完成所有工作..
有没有可能通过资源系统解决?我认为如果我将 fifo_timeout 设置为 0 那么应该明确对于每个单独的请求,队列不应重新排序,请帮忙!
只需更新服务器上的 dask 版本,并将此参数worker-saturation 设置为默认值 1.1。就是这样,现在可以了,谢谢!
从这里获得灵感。