ExecutorService-并行运行任务并保存结果

问题描述 投票:1回答:3

我想同时发送最多10个用户的ping,并在完成ping后用结果更新用户对象。

为了做到这一点,我正在尝试使用ExecutorService

我从这样的代码开始:

private void pingUsers(List<User> userList) throws ExecutionException, InterruptedException {
    final int NUM_THREADS = 10;
    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);

    for (User user : userList) {
        SnmpPingDevice pingUser = new PingUser(user);
        Future<Boolean> isUserActive = executor.submit(pingUser);
        user.isActive = isUserActive.get() ; // -- I guess it will block other pings and i'm back to my starting point where I need to run the pings in parallel.
    }

    executor.shutdown();
    try {
        executor.awaitTermination(30, TimeUnit.SECONDS);


    } catch (InterruptedException e) {
        logger.error("Failed to terminate executor");
    }
}

这是我的PingUser类的样子:

@Override
    public Boolean call() {
        ping = new CmdRunner(toolDir,outDir,
                new UserOidWorkerPing(version,community,ip,logger));

        return this.isActive();
    }

public boolean isActive(){
        String cmd = ping.getCmdNoRedirect(); 
        String rc = this.cmdRunner.runShellCmd(cmd,this.outDir +"/dummy",false);
        logger.debug("PING was sent with cmd:" + cmd + ",rc:" + rc);
        return rc != null && !rc.contains("Timeout:") && !rc.isEmpty();
    }

回到同一问题,ping将不会并行运行(只要循环等待isUserActive.get()结束,就可以了]

知道我缺少什么吗?如何使这些ping并行运行并将每个用户的结果保存在我的List<User> userList中?

java multithreading executorservice
3个回答
1
投票

您正在使用此行来阻止每个调用的执行:

user.isActive = isUserActive.get() ;

这实际上是waits,以便结束通话,并且每次通话一次完成。

您应该提交所有任务,并建立一个Future列表,以便仅在提交所有任务后才等待结果。像这样的东西:

List<Future<Boolean>> tasks = new ArrayList<>();
for (User user : userList) {
    SnmpPingDevice pingUser = new PingUser(user);
    tasks.add(executor.submit(pingUser));
}

for(Future<Boolean> task: tasks) {
    //use the result... OK to get() here.
}

1
投票

Future::get是阻塞操作,因此调用线程将被阻塞直到调用完成。因此,只有在上一个任务完成后才提交新任务。

考虑使用ExecutorService::invokeAll,它将返回ExecutorService::invokeAll的列表:

Future

0
投票

您可以做的是将List<PingUser> pingUsers = userList.stream().map(PingUser::new).collect(Collectors.toList()); List<Future<Boolean>> results = executor.invokeAll(pingUsers); 添加到您提交的user.isActive = future.get()中。

Runnable
© www.soinside.com 2019 - 2024. All rights reserved.