无限并行流映射在Java中使用时间限制来减少流水线

问题描述 投票:1回答:1

我正在尝试用Java实现这种并发策略。我真的不在乎是使用流还是执行程序,这里重要的是效率。

我想在给定的时间期限内并行运行尽可能多的模拟。单个模拟的运行速度快于超时。只要基础线程池中有容量,就应提交新的模拟。

+----------------+         +--------------+           +------------+
|                |  param  |              | Solution  |            |
| SimCoordinator +-------->+  Simulation  +----+----->+ reduce(min)|
|                |         |              |    |      |            |
+-------+--------+         +--------------+    |      +------------+
        ^                                      |
        +--------------------------------------+

类看起来像:

class SimCoordinator {
    double getParam(); //changes every time
    double putSolution(Solution s);
}

class Simulation {
    Solution run(double param); // takes a while to compute
}

协调员为任务提供参数。

然后运行任务并获得解决方案。

解决方案必须反馈给协调器,这将改变参数的生成方式。

每个解决方案都有成本;我想以最小的成本保存解决方案,并在运行模拟时而不是在截止日期之后执行此操作。

Simulation.run()是一个很长的操作,因此应与其他模拟同时运行。假设我有一个N个线程的线程池(或并行流正在工作)。最初,simcoordinator向所有线程提供模拟,并在模拟完成时创建新线程。

解决此类问题的最佳方法是什么?

java parallel-processing java-stream java-threads
1个回答
0
投票

所以本质上您拥有的是

  • 模拟-一个Function<Double, Double>
  • 根据旧参数及其结果计算新参数,即BiFunction<Double, Double, Double>
  • 记录最低参数/得分集,即Consumer<Double, Double>的评估。
  • 计时器用完,阻止了新的模拟开始。

因此,您总共可以拥有类似的东西

class SimulationDriver {
    private ExecutorService service;
    AtomicBoolean finished = new AtomicBoolean();
    private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);

    private Function<Double, Double> simulation;
    private BiFunction<Double, Double, Double> calculateNewParameter;

    private Map<Double, Double> results = new ConcurrentHashMap<>();

    private SimulationDriver(Function<Double, Double> sim,
                             BiFunction<Double, Double, Double> newParam) {
        simulation = sim;
        calculateNewParameter = newParam;
    }

    public void run(int timeout, Runnable callback, Double... params) {
        if (service != null) throw new InvalidStateException("already running");

        // initial simulations, one in each thread
        service = Executors.newFixedThreadPool(params.length);
        for (Double d : params) {
            submitForSimulation(d);
        }
        // after timeout: set finished to prevent new simulations and notify caller
        timer.schedule(() -> { 
            finished.set(true);
            service.shutdown();
            callback.run(); // tell outside world we're finished
        }, timeout, ChronoUnit.SECONDS);
    }

    void submitForSimulation(Double d) {
        if (!finished.get()) { // don't start new simulations after time ran out
            CompletableFuture.supplyAsync(() -> simulation.apply(d), service);
            result.thenAccept(r -> results.put(d, r)); // store param/result in map
            // calculate next parameter and run simulation with that
            result.thenAccept(r -> calculateNewParameter.apply(d, r))
                  .thenAccept(this::submitForSimulation);
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.