无锁圆形数组

问题描述 投票:0回答:5

我正在考虑实现一个无锁循环数组。一个问题是以无锁方式维护头指针和尾指针。我想到的代码是:

int circularIncrementAndGet(AtomicInteger i) {
    i.compareAndSet(array.length - 1, -1);
    return i.incrementAndGet();
}

然后我会做类似的事情:

void add(double value) {
    int idx = circularIncrementAndGet(tail);
    array[idx] = value;
}

(请注意,如果数组已满,旧值将被覆盖,我对此没意见)。

有人认为这个设计有问题吗?我怀疑可能存在我没有看到的竞争条件。

java lock-free circular-buffer
5个回答
5
投票

更简单的方法是使用 2 的幂并执行以下操作。

 final double[] array;
 final int sizeMask;
 final AtomicInteger i = new AtomicInteger();

 public CircularBuffer(int size) {
      assert size > 1 && ((size & (size -1)) == 0); // test power of 2.
      array = new double[size];
      sizeMask = size -1;
 }

 void add(double value) {
     array[i.getAndIncrement() & sizeMask] = value;
 }

4
投票

查看disruptor:http://lmax-exchange.github.io/disruptor/,它是Java中的开源无锁循环缓冲区。


2
投票

是的,存在竞争条件。

i = array.length - 2
,两个线程进入
circularIncrementAndGet()

Thread 1: i.compareAndSet(array.length - 1, -1) results in i = array.length - 2
Thread 2: i.compareAndSet(array.length - 1, -1) results in i = array.length - 2
Thread 1: i.incrementAndGet() results in i = array.length - 1
Thread 2: i.incrementAndGet() results in i = array.length
当线程 2 到达

ArrayIndexOutOfBoundsException

 时,
导致
array[idx] = value
(以及对
add()
的所有后续调用,直到
i
溢出)。

@Peter Lawrey 提出的解决方案不会遇到这个问题。


1
投票

如果您坚持以下限制:

  • 任何时刻只允许一个线程修改头指针
  • 任何时候只允许一个线程修改尾指针
  • Dequeue-on-empty 给出一个返回值,表明什么也没做
  • Enqueue-on-full 给出一个返回值,表明什么也没做
  • 您不会记录队列中存储了多少值。
  • 你“浪费”了数组中永远不会使用的一个索引,这样你就可以知道数组何时已满或为空,而无需保持计数。

可以实现循环数组/队列。

入队线程拥有尾指针。出列线程拥有头指针。除了一个条件外,到目前为止,这两个线程不共享任何状态,因此没有问题。

这个条件是测试空或满。

认为空意味着 head == tail;考虑 full 表示 tail == head - 1 模数组大小。入队必须检查队列是否已满,出队必须检查队列是否为空。您需要浪费数组中的一个索引来检测满和空之间的差异 - 如果您排队到最后一个存储桶中,那么满将是

head == tail
而空将是
head == tail
现在你陷入了死锁 - 你认为你是同时又空又满,所以什么工作都完成不了。

在执行这些检查时,可能会在比较时更新一个值。然而,由于这两个值是单调递增的,因此不存在正确性问题:

  • 如果在出队方法中,在比较过程中 head == tail 计算为 true,但 tail 之后就向前移动,没问题 - 你认为数组是空的,但实际上不是,但这没什么大不了的,你只会从出队方法返回 false,然后重试。
  • 如果在 enqueue 方法中, tail == head - 1 计算为 true,但紧接着 head 就增加了,那么你会认为数组已满,但事实并非如此,但同样,没什么大不了的,你只需从队列中返回 false 并重试即可。

这是我多年前在 Dobb 博士的实现中使用的设计,它对我很有帮助:

http://www.drdobbs.com/parallel/lock-free-queues/208801974


0
投票

https://github.com/jianhong-li/LockFreeRingBuffer 这是我用java语言编写的LockFreeRingBuffer。

public class LockFreeRingBuffer<T> {

    public static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LockFreeRingBuffer.class);

    private final AtomicReferenceArray<T> buffer;
    private final int bufferSize;
    private final long bufferSizeMask;

    private final AtomicLong writeIndex = new AtomicLong(0);
    private final AtomicLong readIndex = new AtomicLong(0);

    public LockFreeRingBuffer(int bufferSize) {
        // Check if bufferSize is positive
        if (bufferSize <= 1) {
            throw new IllegalArgumentException("bufferSize must be positive");
        }

        // Check if bufferSize is power of 2
        int zCnt = 0;
        int _bufferSize = bufferSize;
        while (_bufferSize > 0) {
            if ((_bufferSize & 1) == 1) {
                zCnt++;
            }
            if (zCnt > 1) {
                throw new IllegalArgumentException("bufferSize must be power of 2");
            }
            _bufferSize = _bufferSize >> 1;
        }

        // Initialize buffer and bufferSize
        this.buffer = new AtomicReferenceArray<>(bufferSize);
        this.bufferSize = bufferSize;
        this.bufferSizeMask = bufferSize - 1;
    }

    public int push(T value) {

        // Ensure that the written data is valid
        if (value == null) {
            return -1;
        }

        long pWrite, pRead;
        int loopCnt = 0;
        for (; ; ) {

            int _rIndex = makeIndex(pRead = readIndex.get());
            int _wIndex = makeIndex(pWrite = writeIndex.get()); // push . _wIndex . Expect to read the latest version.

            if (nextIndex(pWrite) == _rIndex) {
                // buffer is full
                return -2;
            }

            // Make sure that the current write pointer points to a NULL slot. That is, it can be written to. (Make sure that the take side has cleaned up the data
)
            if (buffer.get(_wIndex) != null) {
                if ((++loopCnt) > 16) {
                    logger.trace("TRACE: push data retry [01] - buffer[{}] is not null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
                            _wIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
                    Thread.yield();
                }
                continue;
            }

            // Update the pointer first, then write the value. Make sure the ownership is written correctly
            if (writeIndex.compareAndSet(pWrite, pWrite + 1)) {

                // Write value: Theoretically this position must be empty to write
                if (buffer.compareAndSet(_wIndex, null, value)) {
                    // writeCnt.incrementAndGet();
                    return _wIndex;
                }
                // can not happen
                throw new RuntimeException("state error");
            }
        }
    }

    public T pop() {

        int loopCnt = 0;
        long pRead, pWrite;
        for (; ; ) {

            // P_w == P_r , buffer is empty
            int _rIndex = makeIndex(pRead = readIndex.get());
            int _wIndex = makeIndex(pWrite = writeIndex.get());

            if (_rIndex == _wIndex) {
                // buffer is empty
                return null;
            }

            T t = buffer.get(_rIndex); // There is no need to determine null here. However, it is a snapshot of pRead. So there might be a null situation.

            if (t == null) {
                if ((++loopCnt) > 16) {
                    logger.trace("TRACE: pop  data retry [20] - buffer[{}] is     null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
                            _rIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
                    Thread.yield();
                }
                continue;
            }

            /* ************************************************
             *              pWrite
             *              |
             *              v
             *  [] -> [] -> [] -> [] -> [] -> [] -> [] -> []
             *        ^
             *        |
             *        pRead
             *  ************************************************
             *  case: pRead = 1, pWrite = 1
             *        pWrite = pWrite + 1 = 2
             *        But we haven't had time to write the data yet. In this case, pRead = 1, pWrite = 2. The pRead location data is empty
.
             *        now, t==null will continue directly.
             *        after many loop,value at pRead effective finnaly. Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
             *        Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
             */
            if (readIndex.compareAndSet(pRead, pRead + 1)) {
                // pRead+1,
                // Data indicating that the original pointer position can be safely manipulated. And the above is guaranteed to read a non-null value. That is, the competitive write with push is complete.

                //
                // set to null
                boolean compareAndSet = buffer.compareAndSet(_rIndex, t, null);

                // must be success if code has no bug
                if (compareAndSet) {
                    // CAS success. t must be valid
                    return t;
                }
                logger.error("ERROR: pop_data_error - set to null failed, pRead: {} ({}) , pWrite: {} ({})readIndex:{} writeIndex:{}",
                        pRead, _rIndex, pWrite, _wIndex, readIndex.get(), writeIndex.get());
                // can not happen
                throw new RuntimeException("state error");
            }
        }
    }

    /**
     * this function maybe inline by JIT. 
     */
    private int nextIndex(long currentIndex) {
        return (int) ((currentIndex + 1) & bufferSizeMask);
    }

    /**
     * this function maybe inline by JIT. 
     */
    private int makeIndex(long currentIndex) {
        return (int) (currentIndex & bufferSizeMask);
    }

    // ======================== get / setter =======================
    public long getReadCnt() {
        return readIndex.get();
    }

    public long getWriteCnt() {
        return writeIndex.get();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.