Java 中线程安全的循环缓冲区

问题描述 投票:0回答:9

考虑一些并行运行的 Web 服务器实例。每个服务器都拥有对单个共享“状态守护者”的引用,其作用是保留来自所有服务器的最后

N
请求。

例如(

N=3
):

Server a: "Request id = ABCD"        Status keeper=["ABCD"]
Server b: "Request id = XYZZ"        Status keeper=["ABCD", "XYZZ"] 
Server c: "Request id = 1234"        Status keeper=["ABCD", "XYZZ", "1234"]
Server b: "Request id = FOO"         Status keeper=["XYZZ", "1234", "FOO"]
Server a: "Request id = BAR"         Status keeper=["1234", "FOO", "BAR"]

在任何时间点,“状态管理员”都可能被监控应用程序调用,该应用程序读取这些最后的

N
SLA 报告请求。

在 Java 中实现这种生产者-消费者场景的最佳方法是什么,为 Web 服务器提供比 SLA 报告更高的优先级?

CircularFifoBuffer似乎是保存请求的合适数据结构,但我不确定实现高效并发的最佳方法是什么。

java thread-safety circular-buffer
9个回答
22
投票
Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());

8
投票

这是一个无锁环形缓冲区的实现。它实现了固定大小的缓冲区 - 没有 FIFO 功能。我建议您为每个服务器存储

Collection
的请求。这样您的报告就可以进行过滤,而不是让您的数据结构进行过滤。

/**
 * Container
 * ---------
 * 
 * A lock-free container that offers a close-to O(1) add/remove performance.
 * 
 */
public class Container<T> implements Iterable<T> {

  // The capacity of the container.
  final int capacity;
  // The list.
  AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
  // TESTING {
  AtomicLong totalAdded = new AtomicLong(0);
  AtomicLong totalFreed = new AtomicLong(0);
  AtomicLong totalSkipped = new AtomicLong(0);

  private void resetStats() {
    totalAdded.set(0);
    totalFreed.set(0);
    totalSkipped.set(0);
  }
  // TESTING }

  // Constructor
  public Container(int capacity) {
    this.capacity = capacity;
    // Construct the list.
    Node<T> h = new Node<T>();
    Node<T> it = h;
    // One created, now add (capacity - 1) more
    for (int i = 0; i < capacity - 1; i++) {
      // Add it.
      it.next = new Node<T>();
      // Step on to it.
      it = it.next;
    }
    // Make it a ring.
    it.next = h;
    // Install it.
    head.set(h);
  }

  // Empty ... NOT thread safe.
  public void clear() {
    Node<T> it = head.get();
    for (int i = 0; i < capacity; i++) {
      // Trash the element
      it.element = null;
      // Mark it free.
      it.free.set(true);
      it = it.next;
    }
    // Clear stats.
    resetStats();
  }

  // Add a new one.
  public Node<T> add(T element) {
    // Get a free node and attach the element.
    totalAdded.incrementAndGet();
    return getFree().attach(element);
  }

  // Find the next free element and mark it not free.
  private Node<T> getFree() {
    Node<T> freeNode = head.get();
    int skipped = 0;
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free.
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
      skipped += 1;
      freeNode = freeNode.next;
    }
    // Keep count of skipped.
    totalSkipped.addAndGet(skipped);
    if (skipped < capacity) {
      // Put the head as next.
      // Doesn't matter if it fails. That would just mean someone else was doing the same.
      head.set(freeNode.next);
    } else {
      // We hit the end! No more free nodes.
      throw new IllegalStateException("Capacity exhausted.");
    }
    return freeNode;
  }

  // Mark it free.
  public void remove(Node<T> it, T element) {
    totalFreed.incrementAndGet();
    // Remove the element first.
    it.detach(element);
    // Mark it as free.
    if (!it.free.compareAndSet(false, true)) {
      throw new IllegalStateException("Freeing a freed node.");
    }
  }

  // The Node class. It is static so needs the <T> repeated.
  public static class Node<T> {

    // The element in the node.
    private T element;
    // Are we free?
    private AtomicBoolean free = new AtomicBoolean(true);
    // The next reference in whatever list I am in.
    private Node<T> next;

    // Construct a node of the list
    private Node() {
      // Start empty.
      element = null;
    }

    // Attach the element.
    public Node<T> attach(T element) {
      // Sanity check.
      if (this.element == null) {
        this.element = element;
      } else {
        throw new IllegalArgumentException("There is already an element attached.");
      }
      // Useful for chaining.
      return this;
    }

    // Detach the element.
    public Node<T> detach(T element) {
      // Sanity check.
      if (this.element == element) {
        this.element = null;
      } else {
        throw new IllegalArgumentException("Removal of wrong element.");
      }
      // Useful for chaining.
      return this;
    }

    public T get () {
      return element;
    }

    @Override
    public String toString() {
      return element != null ? element.toString() : "null";
    }
  }

  // Provides an iterator across all items in the container.
  public Iterator<T> iterator() {
    return new UsedNodesIterator<T>(this);
  }

  // Iterates across used nodes.
  private static class UsedNodesIterator<T> implements Iterator<T> {
    // Where next to look for the next used node.

    Node<T> it;
    int limit = 0;
    T next = null;

    public UsedNodesIterator(Container<T> c) {
      // Snapshot the head node at this time.
      it = c.head.get();
      limit = c.capacity;
    }

    public boolean hasNext() {
      // Made into a `while` loop to fix issue reported by @Nim in code review
      while (next == null && limit > 0) {
        // Scan to the next non-free node.
        while (limit > 0 && it.free.get() == true) {
          it = it.next;
          // Step down 1.
          limit -= 1;
        }
        if (limit != 0) {
          next = it.element;
        }
      }
      return next != null;
    }

    public T next() {
      T n = null;
      if ( hasNext () ) {
        // Give it to them.
        n = next;
        next = null;
        // Step forward.
        it = it.next;
        limit -= 1;
      } else {
        // Not there!!
        throw new NoSuchElementException ();
      }
      return n;
    }

    public void remove() {
      throw new UnsupportedOperationException("Not supported.");
    }
  }

  @Override
  public String toString() {
    StringBuilder s = new StringBuilder();
    Separator comma = new Separator(",");
    // Keep counts too.
    int usedCount = 0;
    int freeCount = 0;
    // I will iterate the list myself as I want to count free nodes too.
    Node<T> it = head.get();
    int count = 0;
    s.append("[");
    // Scan to the end.
    while (count < capacity) {
      // Is it in-use?
      if (it.free.get() == false) {
        // Grab its element.
        T e = it.element;
        // Is it null?
        if (e != null) {
          // Good element.
          s.append(comma.sep()).append(e.toString());
          // Count them.
          usedCount += 1;
        } else {
          // Probably became free while I was traversing.
          // Because the element is detached before the entry is marked free.
          freeCount += 1;
        }
      } else {
        // Free one.
        freeCount += 1;
      }
      // Next
      it = it.next;
      count += 1;
    }
    // Decorate with counts "]used+free".
    s.append("]").append(usedCount).append("+").append(freeCount);
    if (usedCount + freeCount != capacity) {
      // Perhaps something was added/freed while we were iterating.
      s.append("?");
    }
    return s.toString();
  }
}

请注意,这与 O1 put 和 get 很接近。 A

Separator
只是第一次发出“”,然后从那时起发出它的参数。

编辑:添加测试方法。

// ***** Following only needed for testing. *****
private static boolean Debug = false;
private final static String logName = "Container.log";
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\");

private static synchronized void log(boolean toStdoutToo, String s) {
  if (Debug) {
    if (toStdoutToo) {
      System.out.println(s);
    }
    log(s);
  }
}

private static synchronized void log(String s) {
  if (Debug) {
    try {
      log.writeLn(logName, s);
    } catch (IOException ex) {
      ex.printStackTrace();
    }
  }
}
static volatile boolean testing = true;

// Tester object to exercise the container.
static class Tester<T> implements Runnable {
  // My name.

  T me;
  // The container I am testing.
  Container<T> c;

  public Tester(Container<T> container, T name) {
    c = container;
    me = name;
  }

  private void pause() {
    try {
      Thread.sleep(0);
    } catch (InterruptedException ex) {
      testing = false;
    }
  }

  public void run() {
    // Spin on add/remove until stopped.
    while (testing) {
      // Add it.
      Node<T> n = c.add(me);
      log("Added " + me + ": " + c.toString());
      pause();
      // Remove it.
      c.remove(n, me);
      log("Removed " + me + ": " + c.toString());
      pause();
    }
  }
}
static final String[] strings = {
  "One", "Two", "Three", "Four", "Five",
  "Six", "Seven", "Eight", "Nine", "Ten"
};
static final int TEST_THREADS = Math.min(10, strings.length);

public static void main(String[] args) throws InterruptedException {
  Debug = true;
  log.delete(logName);
  Container<String> c = new Container<String>(10);

  // Simple add/remove
  log(true, "Simple test");
  Node<String> it = c.add(strings[0]);
  log("Added " + c.toString());
  c.remove(it, strings[0]);
  log("Removed " + c.toString());

  // Capacity test.
  log(true, "Capacity test");
  ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
  // Fill it.
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
    log("Added " + strings[i] + " " + c.toString());
  }
  // Add one more.
  try {
    c.add("Wafer thin mint!");
  } catch (IllegalStateException ise) {
    log("Full!");
  }
  c.clear();
  log("Empty: " + c.toString());

  // Iterate test.
  log(true, "Iterator test");
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
  }
  StringBuilder all = new StringBuilder ();
  Separator sep = new Separator(",");
  for (String s : c) {
    all.append(sep.sep()).append(s);
  }
  log("All: "+all);
  for (int i = 0; i < strings.length; i++) {
    c.remove(nodes.get(i), strings[i]);
  }
  sep.reset();
  all.setLength(0);
  for (String s : c) {
    all.append(sep.sep()).append(s);
  }
  log("None: " + all.toString());

  // Multiple add/remove
  log(true, "Multi test");
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
    log("Added " + strings[i] + " " + c.toString());
  }
  log("Filled " + c.toString());
  for (int i = 0; i < strings.length - 1; i++) {
    c.remove(nodes.get(i), strings[i]);
    log("Removed " + strings[i] + " " + c.toString());
  }
  c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
  log("Empty " + c.toString());

  // Multi-threaded add/remove
  log(true, "Threads test");
  c.clear();
  for (int i = 0; i < TEST_THREADS; i++) {
    Thread t = new Thread(new Tester<String>(c, strings[i]));
    t.setName("Tester " + strings[i]);
    log("Starting " + t.getName());
    t.start();
  }
  // Wait for 10 seconds.
  long stop = System.currentTimeMillis() + 10 * 1000;
  while (System.currentTimeMillis() < stop) {
    Thread.sleep(100);
  }
  // Stop the testers.
  testing = false;
  // Wait some more.
  Thread.sleep(1 * 100);
  // Get stats.
  double added = c.totalAdded.doubleValue();
  double skipped = c.totalSkipped.doubleValue();
  //double freed = c.freed.doubleValue();
  log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
}

3
投票

也许您想看看Disruptor - 并发编程框架

  • 在此处查找描述替代方案、设计以及与
    java.util.concurrent.ArrayBlockingQueue
    的性能比较的论文:pdf
  • 考虑阅读 BlogsAndArticles
  • 的前三篇文章

如果库太多,坚持

java.util.concurrent.ArrayBlockingQueue


2
投票

我会看看 ArrayDeque,或者要了解更多并发实现,请查看 Disruptor 库,它是 Java 中最复杂的环形缓冲区之一。

另一种方法是使用无界队列,它的并发性更高,因为生产者永远不需要等待消费者。 Java纪事

除非您的需求证明其复杂性是合理的,否则 ArrayDeque 可能就是您所需要的。


1
投票

另请参阅

java.util.concurrent

阻塞队列将一直阻塞,直到有东西可以消耗或(可选)有空间可以生产:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

并发链接队列是非阻塞的,并使用灵活的算法,允许生产者和消费者同时活动:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html


1
投票

Hazelcast 的 Queue 几乎提供了您所需的一切,但不支持循环。但从你的描述我不确定你是否真的需要它。


0
投票

如果是我,我会按照您的指示使用 CircularFIFOBuffer,并在写入(添加)时围绕缓冲区进行同步。当监控应用程序想要读取缓冲区时,在缓冲区上进行同步,然后复制或克隆它以用于报告。

此建议基于以下假设:将缓冲区复制/克隆到新对象的延迟最小。如果有大量元素,并且复制时间很慢,那么这不是一个好主意。

伪代码示例:

public void writeRequest(String requestID) {
    synchronized(buffer) {
       buffer.add(requestID);
    }
}

public Collection<String> getRequests() {
     synchronized(buffer) {
        return buffer.clone();
     }
}

0
投票

既然你特别要求给予作者(即网络服务器)比读者(即监控)更高的优先级,我建议采用以下设计。

Web 服务器将请求信息添加到由专用线程读取的并发队列中,该队列将请求添加到线程本地(因此非同步)队列中,该队列会覆盖最旧的元素,例如

EvictingQueue
CircularFifoQueue
。 该线程检查一个标志,该标志指示在处理每个请求后是否已请求报告,如果是,则从线程本地队列中存在的所有元素生成报告。


0
投票

https://github.com/jianhong-li/LockFreeRingBuffer 这是我用java语言编写的LockFreeRingBuffer。

public class LockFreeRingBuffer<T> {

    public static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LockFreeRingBuffer.class);

    private final AtomicReferenceArray<T> buffer;
    private final int bufferSize;
    private final long bufferSizeMask;

    private final AtomicLong writeIndex = new AtomicLong(0);
    private final AtomicLong readIndex = new AtomicLong(0);

    public LockFreeRingBuffer(int bufferSize) {
        // Check if bufferSize is positive
        if (bufferSize <= 1) {
            throw new IllegalArgumentException("bufferSize must be positive");
        }

        // Check if bufferSize is power of 2
        int zCnt = 0;
        int _bufferSize = bufferSize;
        while (_bufferSize > 0) {
            if ((_bufferSize & 1) == 1) {
                zCnt++;
            }
            if (zCnt > 1) {
                throw new IllegalArgumentException("bufferSize must be power of 2");
            }
            _bufferSize = _bufferSize >> 1;
        }

        // Initialize buffer and bufferSize
        this.buffer = new AtomicReferenceArray<>(bufferSize);
        this.bufferSize = bufferSize;
        this.bufferSizeMask = bufferSize - 1;
    }

    public int push(T value) {

        // Ensure that the written data is valid
        if (value == null) {
            return -1;
        }

        long pWrite, pRead;
        int loopCnt = 0;
        for (; ; ) {

            int _rIndex = makeIndex(pRead = readIndex.get());
            int _wIndex = makeIndex(pWrite = writeIndex.get()); // push . _wIndex . Expect to read the latest version.

            if (nextIndex(pWrite) == _rIndex) {
                // buffer is full
                return -2;
            }

            // Make sure that the current write pointer points to a NULL slot. That is, it can be written to. (Make sure that the take side has cleaned up the data
)
            if (buffer.get(_wIndex) != null) {
                if ((++loopCnt) > 16) {
                    logger.trace("TRACE: push data retry [01] - buffer[{}] is not null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
                            _wIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
                    Thread.yield();
                }
                continue;
            }

            // Update the pointer first, then write the value. Make sure the ownership is written correctly
            if (writeIndex.compareAndSet(pWrite, pWrite + 1)) {

                // Write value: Theoretically this position must be empty to write
                if (buffer.compareAndSet(_wIndex, null, value)) {
                    // writeCnt.incrementAndGet();
                    return _wIndex;
                }
                // can not happen
                throw new RuntimeException("state error");
            }
        }
    }

    public T pop() {

        int loopCnt = 0;
        long pRead, pWrite;
        for (; ; ) {

            // P_w == P_r , buffer is empty
            int _rIndex = makeIndex(pRead = readIndex.get());
            int _wIndex = makeIndex(pWrite = writeIndex.get());

            if (_rIndex == _wIndex) {
                // buffer is empty
                return null;
            }

            T t = buffer.get(_rIndex); // There is no need to determine null here. However, it is a snapshot of pRead. So there might be a null situation.

            if (t == null) {
                if ((++loopCnt) > 16) {
                    logger.trace("TRACE: pop  data retry [20] - buffer[{}] is     null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
                            _rIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
                    Thread.yield();
                }
                continue;
            }

            /* ************************************************
             *              pWrite
             *              |
             *              v
             *  [] -> [] -> [] -> [] -> [] -> [] -> [] -> []
             *        ^
             *        |
             *        pRead
             *  ************************************************
             *  case: pRead = 1, pWrite = 1
             *        pWrite = pWrite + 1 = 2
             *        But we haven't had time to write the data yet. In this case, pRead = 1, pWrite = 2. The pRead location data is empty
.
             *        now, t==null will continue directly.
             *        after many loop,value at pRead effective finnaly. Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
             *        Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
             */
            if (readIndex.compareAndSet(pRead, pRead + 1)) {
                // pRead+1,
                // Data indicating that the original pointer position can be safely manipulated. And the above is guaranteed to read a non-null value. That is, the competitive write with push is complete.

                //
                // set to null
                boolean compareAndSet = buffer.compareAndSet(_rIndex, t, null);

                // must be success if code has no bug
                if (compareAndSet) {
                    // CAS success. t must be valid
                    return t;
                }
                logger.error("ERROR: pop_data_error - set to null failed, pRead: {} ({}) , pWrite: {} ({})readIndex:{} writeIndex:{}",
                        pRead, _rIndex, pWrite, _wIndex, readIndex.get(), writeIndex.get());
                // can not happen
                throw new RuntimeException("state error");
            }
        }
    }

    /**
     * this function maybe inline by JIT. 
     */
    private int nextIndex(long currentIndex) {
        return (int) ((currentIndex + 1) & bufferSizeMask);
    }

    /**
     * this function maybe inline by JIT. 
     */
    private int makeIndex(long currentIndex) {
        return (int) (currentIndex & bufferSizeMask);
    }

    // ======================== get / setter =======================
    public long getReadCnt() {
        return readIndex.get();
    }

    public long getWriteCnt() {
        return writeIndex.get();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.