等待线程中断并抛出 InterruptionException 后获取 Future 的结果是否可以接受?

问题描述 投票:0回答:1

这样的代码是否可以接受:

ExcecutorService excecutor = ...

  public <T> T invoke(Callable<T> task) {
    Future<T> future = executor.submit(task);
    boolean interrupted = false;
    T result = null;
    try {
      result = future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      interrupted = true;
    } finally {
      if (interrupted) {
        try {
          result = future.get();
        } catch (ExecutionException e) {
          throw new RuntimeException(e);
        } catch (InterruptedException e) {
          //
        }
        Thread.currentThread().interrupt();
      }
    }
    return result;
  }

所以,我想保证即使等待线程被中断也能完成Future的任务并得到结果,当然还要保留等待线程的中断标志。

java multithreading get future interrupted-exception
1个回答
0
投票

好吧,正确回答问题:

这是测试它的示例代码:

package stackoverflow;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class InterruptFuture {



    static private final ExecutorService executor = new ThreadPoolExecutor(1, 20, 20, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

    static private final Callable<String> CALL = new Callable<String>() {
        @Override public String call() throws Exception {
            System.out.println("InterruptFuture.CALL.new Callable() {...}.call() running...");
            tryToSleep(3000);
            System.out.println("InterruptFuture.CALL.new Callable() {...}.call() done.");
            return "Hello World!";
        }
    };



    static public void main(final String[] args) {
        runTimerThread();
        interruptMainThreadLater();
        final String ret = invoke(CALL);
        System.out.println("Result: " + ret);

        executor.shutdown(); // if executor stayed alive, non-daemon thread would be running, thus not exiting the app
    }



    /**
     * Prints about every 10 ms
     */
    static private Thread runTimerThread() {
        final long startMS = System.currentTimeMillis();
        return startThread(true, () -> {
            while (true) {
                final long deltaMS = System.currentTimeMillis() - startMS;
                System.out.println(". " + deltaMS);
                tryToSleep(10);
            }
        });
    }

    /**
     * Interrupts main thread 4 times, 1 time after all the rest has long ended
     */
    static private Thread interruptMainThreadLater() {
        final Thread targetThread = Thread.currentThread();
        return startThread(false, () -> {
            for (int i = 1; i <= 4; i++) {
                tryToSleep(1000);
                System.out.println("#" + i + "\tInterrupting target thread...");
                targetThread.interrupt();
                System.out.println("#" + i + "\tInterrupting target thread: Done.");
            }
        });
    }



    static private <T> T invoke(final Callable<T> task) {
        final Future<T> future = executor.submit(task);
        System.out.println("FUTURE: " + future.getClass());
        try {
            while (true) {
                try {
                    System.out.println("InterruptFuture.invoke() waiting on callable...");
                    final T result = future.get();
                    System.out.println("InterruptFuture.invoke() got result.");
                    return result;
                } catch (final InterruptedException e) {
                    System.err.println("InterruptFuture.invoke() InterruptedException{future.get()}");
                    // keep re-trying
                }
            }
        } catch (final ExecutionException e) {
            System.out.println("InterruptFuture.invoke() ExecutionException{future.get()}");
            throw new RuntimeException(e);
        }
    }



    /**
     * @return true if interrupted
     */
    static public boolean tryToSleep(final int pMS) {
        try {
            Thread.sleep(pMS);
            return false;
        } catch (final InterruptedException e) {
            return true;
        }
    }
    static public Thread startThread(final boolean pAsDaemon, final Runnable pLambda) {
        final Thread t = new Thread(() -> {
            pLambda.run();
        });
        if (pAsDaemon) t.setDaemon(true);
        t.start();
        return t;
    }



}

连同输出:

FUTURE: class java.util.concurrent.FutureTask
. 79
InterruptFuture.invoke() waiting on callable...
InterruptFuture.CALL.new Callable() {...}.call() running...
. 90
. 100
...
. 1062
. 1078
#1  Interrupting target thread...
. 1100
#1  Interrupting target thread: Done.
InterruptFuture.invoke() waiting on callable...
InterruptFuture.invoke() InterruptedException{future.get()}
. 1122
. 1137
...
. 2063
. 2079
#2  Interrupting target thread...
#2  Interrupting target thread: Done.
InterruptFuture.invoke() InterruptedException{future.get()}InterruptFuture.invoke() waiting on callable...
. 2103
. 2120
...
. 3080
. 3101
InterruptFuture.CALL.new Callable() {...}.call() done.
InterruptFuture.invoke() got result.
Result: Hello World!
. 3117
#3  Interrupting target thread...
#3  Interrupting target thread: Done.
. 3132
. 3148
...
. 4081
. 4103
#4  Interrupting target thread...
#4  Interrupting target thread: Done.

输出注释:

  • InterruptedException
    错误消息和“等待”消息可以混合在一起,甚至可以在同一行,因为
    System.err
    System.out
    是缓冲的和异步的。
  • 结果已经出来了~ 3000ms,此时问题就解决了。后来的印刷品只是进一步表明后来对
    future.get()
    的调用不会产生不利影响。

代码注释:

  • 这只是一个简单的测试。对于部署应用程序,应该更加注意适当的并发性
  • 正如 @TimMoore 在他的评论中已经指出的那样,正在运行的算法在中断时可能不应该恢复工作。或者,如果确实如此,它至少应该使用额外的控制并检查变量和文档,以使某人能够真正阻止它

查看文档:

位于 https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html。 没有提到无法多次调用它,或者任何不利或意外的行为,例如立即抛出异常。

查看源代码:

  • 查看
    Future
    的源代码有点复杂
  • 我在代码中设置了一个输出,以找出实际使用的实现。结果是:
    class java.util.concurrent.FutureTask
  • 检查这一点会让我们陷入更多混乱,因为根据我们使用的 JRE,有完全不同的实现。尤其是jdk8和jdk7有很大的不同(https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/util/concurrent/FutureTask.java
  • jdk8使用自己内部的
    FutureTask
    .
    awaitDone()
    方法
  • 而jdk7使用
    java.util.concurrent.locks.AbstractQueuedSynchronizer
    .doAcquireInterruptibly()
  • 但最终两者都只是在完成后报告,或者将当前线程放入等待队列并在任务(可调用的)
    call()
    方法返回值或异常后释放它。
  • 所以最终,您可以随时调用
    get()
    方法。
© www.soinside.com 2019 - 2024. All rights reserved.