ExecutorService线程安全

问题描述 投票:0回答:1

假设我有一个来自Executors静态工厂方法的ExecutorService实例。

如果我提交了一个Callable,其中RetVal不是某个线程的线程安全的本地实例化对象,那么当我从同一线程中获取RetVals的完整性时,我是否需要担心它的完整性?人们说局部变量是线程安全的,但是当您返回本地实例化的Object并从其他线程接收它时,我不确定它是否适用。

这是与我的情况类似的示例:

ExecutorService executor = Executors.newFixedThreadPool(5);
Future<List<String>> fut = executor.submit(() -> {
    List<String> ret = new ArrayList<>();
    ret.add("aasdf");
    ret.add("dfls");
    return ret;
});

List<String> myList = fut.get();

在上面的示例中,我正在检索在另一个线程中创建的ArrayList -由执行程序创建的一个。我认为上面的代码不是线程安全的,但是我找不到关于我的特定情况的太多信息。

现在,我在计算机上尝试了上面的代码,实际上它100%地返回了预期的结果,我什至尝试使用自己的ExecutorService实现,到目前为止,我只得到了预期的结果。因此,除非我非常幸运,否则我很确定它会起作用,但我不确定如何。我在另一个线程中创建了一个非线程安全的对象,并在另一个线程中接收了该对象。我是否应该有机会收到部分构造的对象-在我的情况下,该列表不包含2个字符串?

以下是我自定义的实现,仅供测试。您可以忽略EType枚举。

class MyExecutor {

    enum EType {
        NoHolder, Holder1, Holder2
    }

    private ConcurrentLinkedQueue<MyFutureTask<?>> tasksQ;
    private final Thread thread;

    private final EType eType;

    public MyExecutor(EType eType) {
        eType = Objects.requireNonNull(eType);

        tasksQ = new ConcurrentLinkedQueue<>();
        thread = new Thread(new MyRunnable());
        thread.start();
    }

    public <T> Future<T> submit(Callable<T> c) {
        MyFutureTask<T> task = new MyFutureTask<T>(c, eType);
        tasksQ.add(task);
        return task;
    }

    class MyRunnable implements Runnable {
        @Override
        public void run() {
            while (true) {
                if (tasksQ.isEmpty()) {
                    try {
                        Thread.sleep(1);
                        continue;
                    } catch (InterruptedException ite) {
                        Thread.interrupted();
                        break;
                    }
                }

                MyFutureTask<?> task = tasksQ.poll();
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class MyFutureTask<T> implements RunnableFuture<T> {

        final Callable<?> cb;
        volatile Object outcome;

        static final int STATE_PENDING = 1;
        static final int STATE_EXECUTING = 2;
        static final int STATE_DONE = 3;

        final AtomicInteger atomicState = new AtomicInteger(STATE_PENDING);

        final EType eType;

        public MyFutureTask(Callable<?> cb, EType eType) {
            cb = Objects.requireNonNull(cb);
            eType = Objects.requireNonNull(eType);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new NotImplementedException();
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return atomicState.get() == STATE_DONE;
        }

        @SuppressWarnings("unchecked")
        @Override
        public T get() throws InterruptedException, ExecutionException {
            while (true) {
                switch (atomicState.get()) {
                case STATE_PENDING:
                case STATE_EXECUTING:
//                      Thread.sleep(1);
                    break;
                case STATE_DONE:
                    return (T)outcome;
                default:
                    throw new IllegalStateException();
                }
            }
        }

        @Override
        public T get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            throw new NotImplementedException();
        }

        void set(T t) {
            outcome = t;
        }

        @Override
        public void run() {
            if (atomicState.compareAndSet(STATE_PENDING, STATE_EXECUTING)) {
                Object result;
                try {
                    switch (eType) {
                    case NoHolder:
                        result = cb.call();
                        break;
                    case Holder1:
                        throw new NotImplementedException();
                    case Holder2:
                        throw new NotImplementedException();
                    default:
                        throw new IllegalStateException();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    result = null;
                }

                outcome = result;
                atomicState.set(STATE_DONE);
            }
        }
    }
}

class MyTask implements Callable<List<Integer>> {
    @Override
    public List<Integer> call() throws Exception {
        List<Integer> ret = new ArrayList<>(100);
        IntStream.range(0, 100).boxed().forEach(ret::add);
        return ret;
    }
}
java multithreading concurrency thread-safety executorservice
1个回答
0
投票

重要的是happens-before关系。从ExecutorService API文档:

内存一致性影响:线程之前的操作向Runnable提交CallableExecutorService任务happen-before该任务执行的任何操作,依次执行happen-before通过Future.get()检索结果。

因此您可以安全地传输这样的可变对象。

© www.soinside.com 2019 - 2024. All rights reserved.