我正在尝试用Java实现这种并发策略。我真的不在乎是使用流还是执行程序,这里重要的是效率。
我想在给定的时间期限内并行运行尽可能多的模拟。单个模拟的运行速度快于超时。只要基础线程池中有容量,就应提交新的模拟。
+----------------+ +--------------+ +------------+
| | param | | Solution | |
| SimCoordinator +-------->+ Simulation +----+----->+ reduce(min)|
| | | | | | |
+-------+--------+ +--------------+ | +------------+
^ |
+--------------------------------------+
类看起来像:
class SimCoordinator {
double getParam(); //changes every time
double putSolution(Solution s);
}
class Simulation {
Solution run(double param); // takes a while to compute
}
协调员为任务提供参数。
然后运行任务并获得解决方案。
解决方案必须反馈给协调器,这将改变参数的生成方式。
每个解决方案都有成本;我想以最小的成本保存解决方案,并在运行模拟时而不是在截止日期之后执行此操作。
Simulation.run()是一个很长的操作,因此应与其他模拟同时运行。假设我有一个N个线程的线程池(或并行流正在工作)。最初,simcoordinator向所有线程提供模拟,并在模拟完成时创建新线程。
解决此类问题的最佳方法是什么?
所以本质上您拥有的是
Function<Double, Double>
BiFunction<Double, Double, Double>
Consumer<Double, Double>
的评估。因此,您总共可以拥有类似的东西
class SimulationDriver {
private ExecutorService service;
AtomicBoolean finished = new AtomicBoolean();
private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);
private Function<Double, Double> simulation;
private BiFunction<Double, Double, Double> calculateNewParameter;
private Map<Double, Double> results = new ConcurrentHashMap<>();
private SimulationDriver(Function<Double, Double> sim,
BiFunction<Double, Double, Double> newParam) {
simulation = sim;
calculateNewParameter = newParam;
}
public void run(int timeout, Runnable callback, Double... params) {
if (service != null) throw new InvalidStateException("already running");
// initial simulations, one in each thread
service = Executors.newFixedThreadPool(params.length);
for (Double d : params) {
submitForSimulation(d);
}
// after timeout: set finished to prevent new simulations and notify caller
timer.schedule(() -> {
finished.set(true);
service.shutdown();
callback.run(); // tell outside world we're finished
}, timeout, ChronoUnit.SECONDS);
}
void submitForSimulation(Double d) {
if (!finished.get()) { // don't start new simulations after time ran out
CompletableFuture.supplyAsync(() -> simulation.apply(d), service);
result.thenAccept(r -> results.put(d, r)); // store param/result in map
// calculate next parameter and run simulation with that
result.thenAccept(r -> calculateNewParameter.apply(d, r))
.thenAccept(this::submitForSimulation);
}
}
}