在使用Parallel Streams和Atomic Variables期间发生的竞争条件

问题描述 投票:2回答:1

当下面的代码被执行时,我会以随机的方式获得异常。

byte[][] loremIpsumContentArray = new byte[64][];

for (int i = 0; i < loremIpsumContentArray.length; i++)
{
    random.nextBytes(loremIpsumContentArray[i] = new byte[CONTENT_SIZE]);
}

AtomicBoolean aBoolean = new AtomicBoolean(true);
List<Long> resultList = IntStream.range(0, 64* 2)
                                 .parallel()
                                 .mapToObj(i -> getResult(i,
                                                          aBoolean,
                                                          repositoryPath,
                                                          loremIpsumContentArray ))
                                 .collect(Collectors.toList());

getResult功能:

try
{
    Repository repository = repositoryPath.getRepository();
    String path = RepositoryFiles.relativizePath(repositoryPath);

    //return aBoolean.compareAndSet(aBoolean.get(), !aBoolean.get()) ?
    return aBoolean.getAndSet(!aBoolean.get()) ?
                              new Store(new ByteArrayInputStream(loremIpsumContentArray[i / 2]), repository, path, lock).call() :
                              new Fetch(repository, path, lock).call();
}

从上面可以看出,代码正在使用并行流,然后调用getResult函数。还有一个原子变量。当atomicVariable为真时,调用store函数,当它为假时,调用fetch函数。

我的理解是在getResult函数里面我们正在检查和更新原子变量aBoolean,这个检查和更新操作是原子的,但new Store(...).call();new Fetch(...).call();不是,因为并行流涉及多个线程,所以有一个竞争条件发生在

return aBoolean.getAndSet(!aBoolean.get()) ?
                          new Store(new ByteArrayInputStream(loremIpsumContentArray[i / 2]), repository, path).call() :
                          new Fetch(repository, path).call();

为了证实我的种族条件理论,我将如下所示的lock添加到new Store(...).call()new Fetch(...).call(),如下所示,然后一切正常:

Lock lock = new ReentrantLock();
AtomicBoolean aBoolean = new AtomicBoolean(true);
List<Long> resultList = IntStream.range(0, 64* 2)
                                 .parallel()
                                 .mapToObj(i -> getResult(i,
                                                          aBoolean,
                                                          repositoryPath,
                                                          loremIpsumContentArray,
                                                          lock))
                                 .collect(Collectors.toList());

getResult功能:

return aBoolean.getAndSet(!aBoolean.get()) ?
                          new Store(new ByteArrayInputStream(loremIpsumContentArray[i / 2]), repository, path, lock).call() :
                          new Fetch(repository, path, lock).call();

我有以下问题:

  • 关于上面提到的竞争条件,我的理解是否正确,我是否按照应该使用的方式使用了锁?
  • 避免竞争条件的其他方法有哪些?

请让我知道你的想法。

java multithreading java-stream atomic strongbox
1个回答
1
投票

你的aBoolean.getAndSet(!aBoolean.get())不是原子的。一些线程可以在!aBoolean.get()和周围的aBoolean.getAndSet之间跳跃,这可能导致竞争条件。

您应该同步块:

boolean whatToExec;
synchronized (aBoolean) {
    whatToExec = aBoolean.get();
    aBoolean.set(!whatToExec);
}
return whatToExec ? ...

锁定在FetchStore的好处是未知的。如果竞争条件发生在那里,我目前没有答案。

© www.soinside.com 2019 - 2024. All rights reserved.