我的项目涉及使用Julia的Distributed
's Distributed
函数并行计算地图。
映射给定元素可能要花费几秒钟,或者可能要花很长时间。我希望单个地图任务/计算完成超时或时间限制。
如果地图任务及时完成,请返回计算结果。如果任务没有在时限内完成,请在达到时限后停止计算,然后返回一些值或消息,指示发生了超时。
下面是一个最小的例子。首先是导入的模块,然后启动工作进程:
pmap
接下来,为所有工作进程定义了映射任务。映射任务应在3秒后超时:
pmap
执行并行映射的功能(num_procs = 1
using Distributed
import Random
if num_procs > 1
# The main process (no calling addprocs) can be used for `pmap`:
addprocs(num_procs-1)
end
)在主进程中定义。每个地图任务随机最多需要5秒才能完成,但应在3秒后超时。
@everywhere begin
"""
Sleep (compute stuff) for `wait_time` seconds, and return `wait_time`.
If `timeout` seconds elapses, stop computation and return something else.
"""
function waitForTimeUnlessTimeout(wait_time, timeout=3)
# Ideally, a timer would be set here of duration `timeout`.
# When the timer elapses/finishes, an error ("TimeoutError", etc) would
# be thrown, or a different value would be returned from this function.
# < Insert some sort of timeout code? Set timer? Set timeout handler? >
# This sleep line simulates a long computation.
# (pretend the computation time is unknown)
sleep(wait_time)
# computation completed before time limit. Return wait_time.
round(wait_time, digits=2)
end
end
该限时并行地图应如何实施?
您可以在pmap
体内放置这样的内容>
""" I'm interested in a parallel-computed mapping (pmap) for `num_tasks` number of tasks. The mapping task `waitForTimeUnlessTimeout` must complete in a certain amount of time, or else return a different value or throw an error. """ function myParallelMapping(num_tasks = 20, max_runtime=5) # random task runtimes between 0 and max_runtime runtimes = Random.rand(num_tasks) * max_runtime # return the parallel computation of the mapping tasks pmap((runtime)->waitForTimeUnlessTimeout(runtime), runtimes) end print(myParallelMapping())
还请注意,
pmap
与pmap(runtimes) do runtime t0 = time() task = @async waitForTimeUnlessTimeout(runtime) while !istaskdone(task) && time()-t0 < time_limit sleep(1) end istaskdone(task) && (return fetch(task)) error("time over") end
相同。