分布式Julia:并行地图(pmap),每个地图任务都有超时/时间限制

问题描述 投票:0回答:1

我的项目涉及使用Julia的Distributed's Distributed函数并行计算地图。

映射给定元素可能要花费几秒钟,或者可能要花很长时间。我希望单个地图任务/计算完成超时或时间限制。

如果地图任务及时完成,请返回计算结果。如果任务没有在时限内完成,请在达到时限后停止计算,然后返回一些值或消息,指示发生了超时。

下面是一个最小的例子。首先是导入的模块,然后启动工作进程:

pmap

接下来,为所有工作进程定义了映射任务。映射任务应在3秒后超时:

pmap

执行并行映射的功能(num_procs = 1 using Distributed import Random if num_procs > 1 # The main process (no calling addprocs) can be used for `pmap`: addprocs(num_procs-1) end )在主进程中定义。每个地图任务随机最多需要5秒才能完成,但应在3秒后超时。

@everywhere begin
    """
    Sleep (compute stuff) for `wait_time` seconds, and return `wait_time`.
    If `timeout` seconds elapses, stop computation and return something else.
    """
    function waitForTimeUnlessTimeout(wait_time, timeout=3)
        # Ideally, a timer would be set here of duration `timeout`.
        # When the timer elapses/finishes, an error ("TimeoutError", etc) would
        # be thrown, or a different value would be returned from this function.

        # < Insert some sort of timeout code? Set timer? Set timeout handler? >

         # This sleep line simulates a long computation.
         # (pretend the computation time is unknown)
        sleep(wait_time)

        # computation completed before time limit. Return wait_time.
        round(wait_time, digits=2)
    end
end

该限时并行地图应如何实施?

parallel-processing julia interrupt distributed-computing pmap
1个回答
1
投票

您可以在pmap体内放置这样的内容>

"""
I'm interested in a parallel-computed mapping (pmap) for `num_tasks` number
of tasks. The mapping task `waitForTimeUnlessTimeout` must complete in a certain
amount of time, or else return a different value or throw an error.
"""
function myParallelMapping(num_tasks = 20, max_runtime=5)    
    # random task runtimes between 0 and max_runtime
    runtimes = Random.rand(num_tasks) * max_runtime

    # return the parallel computation of the mapping tasks
    pmap((runtime)->waitForTimeUnlessTimeout(runtime), runtimes)
end

print(myParallelMapping())

还请注意,pmappmap(runtimes) do runtime t0 = time() task = @async waitForTimeUnlessTimeout(runtime) while !istaskdone(task) && time()-t0 < time_limit sleep(1) end istaskdone(task) && (return fetch(task)) error("time over") end 相同。

© www.soinside.com 2019 - 2024. All rights reserved.