考虑一些并行运行的 Web 服务器实例。每个服务器都拥有对单个共享“状态守护者”的引用,其作用是保留来自所有服务器的最后
N
请求。
例如(
N=3
):
Server a: "Request id = ABCD" Status keeper=["ABCD"]
Server b: "Request id = XYZZ" Status keeper=["ABCD", "XYZZ"]
Server c: "Request id = 1234" Status keeper=["ABCD", "XYZZ", "1234"]
Server b: "Request id = FOO" Status keeper=["XYZZ", "1234", "FOO"]
Server a: "Request id = BAR" Status keeper=["1234", "FOO", "BAR"]
在任何时间点,“状态管理员”都可能被监控应用程序调用,该应用程序读取这些最后的
N
SLA 报告请求。
在 Java 中实现这种生产者-消费者场景的最佳方法是什么,为 Web 服务器提供比 SLA 报告更高的优先级?
CircularFifoBuffer似乎是保存请求的合适数据结构,但我不确定实现高效并发的最佳方法是什么。
Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());
这是一个无锁环形缓冲区的实现。它实现了固定大小的缓冲区 - 没有 FIFO 功能。我建议您为每个服务器存储
Collection
的请求。这样您的报告就可以进行过滤,而不是让您的数据结构进行过滤。
/**
* Container
* ---------
*
* A lock-free container that offers a close-to O(1) add/remove performance.
*
*/
public class Container<T> implements Iterable<T> {
// The capacity of the container.
final int capacity;
// The list.
AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
// TESTING {
AtomicLong totalAdded = new AtomicLong(0);
AtomicLong totalFreed = new AtomicLong(0);
AtomicLong totalSkipped = new AtomicLong(0);
private void resetStats() {
totalAdded.set(0);
totalFreed.set(0);
totalSkipped.set(0);
}
// TESTING }
// Constructor
public Container(int capacity) {
this.capacity = capacity;
// Construct the list.
Node<T> h = new Node<T>();
Node<T> it = h;
// One created, now add (capacity - 1) more
for (int i = 0; i < capacity - 1; i++) {
// Add it.
it.next = new Node<T>();
// Step on to it.
it = it.next;
}
// Make it a ring.
it.next = h;
// Install it.
head.set(h);
}
// Empty ... NOT thread safe.
public void clear() {
Node<T> it = head.get();
for (int i = 0; i < capacity; i++) {
// Trash the element
it.element = null;
// Mark it free.
it.free.set(true);
it = it.next;
}
// Clear stats.
resetStats();
}
// Add a new one.
public Node<T> add(T element) {
// Get a free node and attach the element.
totalAdded.incrementAndGet();
return getFree().attach(element);
}
// Find the next free element and mark it not free.
private Node<T> getFree() {
Node<T> freeNode = head.get();
int skipped = 0;
// Stop when we hit the end of the list
// ... or we successfully transit a node from free to not-free.
while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
skipped += 1;
freeNode = freeNode.next;
}
// Keep count of skipped.
totalSkipped.addAndGet(skipped);
if (skipped < capacity) {
// Put the head as next.
// Doesn't matter if it fails. That would just mean someone else was doing the same.
head.set(freeNode.next);
} else {
// We hit the end! No more free nodes.
throw new IllegalStateException("Capacity exhausted.");
}
return freeNode;
}
// Mark it free.
public void remove(Node<T> it, T element) {
totalFreed.incrementAndGet();
// Remove the element first.
it.detach(element);
// Mark it as free.
if (!it.free.compareAndSet(false, true)) {
throw new IllegalStateException("Freeing a freed node.");
}
}
// The Node class. It is static so needs the <T> repeated.
public static class Node<T> {
// The element in the node.
private T element;
// Are we free?
private AtomicBoolean free = new AtomicBoolean(true);
// The next reference in whatever list I am in.
private Node<T> next;
// Construct a node of the list
private Node() {
// Start empty.
element = null;
}
// Attach the element.
public Node<T> attach(T element) {
// Sanity check.
if (this.element == null) {
this.element = element;
} else {
throw new IllegalArgumentException("There is already an element attached.");
}
// Useful for chaining.
return this;
}
// Detach the element.
public Node<T> detach(T element) {
// Sanity check.
if (this.element == element) {
this.element = null;
} else {
throw new IllegalArgumentException("Removal of wrong element.");
}
// Useful for chaining.
return this;
}
public T get () {
return element;
}
@Override
public String toString() {
return element != null ? element.toString() : "null";
}
}
// Provides an iterator across all items in the container.
public Iterator<T> iterator() {
return new UsedNodesIterator<T>(this);
}
// Iterates across used nodes.
private static class UsedNodesIterator<T> implements Iterator<T> {
// Where next to look for the next used node.
Node<T> it;
int limit = 0;
T next = null;
public UsedNodesIterator(Container<T> c) {
// Snapshot the head node at this time.
it = c.head.get();
limit = c.capacity;
}
public boolean hasNext() {
// Made into a `while` loop to fix issue reported by @Nim in code review
while (next == null && limit > 0) {
// Scan to the next non-free node.
while (limit > 0 && it.free.get() == true) {
it = it.next;
// Step down 1.
limit -= 1;
}
if (limit != 0) {
next = it.element;
}
}
return next != null;
}
public T next() {
T n = null;
if ( hasNext () ) {
// Give it to them.
n = next;
next = null;
// Step forward.
it = it.next;
limit -= 1;
} else {
// Not there!!
throw new NoSuchElementException ();
}
return n;
}
public void remove() {
throw new UnsupportedOperationException("Not supported.");
}
}
@Override
public String toString() {
StringBuilder s = new StringBuilder();
Separator comma = new Separator(",");
// Keep counts too.
int usedCount = 0;
int freeCount = 0;
// I will iterate the list myself as I want to count free nodes too.
Node<T> it = head.get();
int count = 0;
s.append("[");
// Scan to the end.
while (count < capacity) {
// Is it in-use?
if (it.free.get() == false) {
// Grab its element.
T e = it.element;
// Is it null?
if (e != null) {
// Good element.
s.append(comma.sep()).append(e.toString());
// Count them.
usedCount += 1;
} else {
// Probably became free while I was traversing.
// Because the element is detached before the entry is marked free.
freeCount += 1;
}
} else {
// Free one.
freeCount += 1;
}
// Next
it = it.next;
count += 1;
}
// Decorate with counts "]used+free".
s.append("]").append(usedCount).append("+").append(freeCount);
if (usedCount + freeCount != capacity) {
// Perhaps something was added/freed while we were iterating.
s.append("?");
}
return s.toString();
}
}
请注意,这与 O1 put 和 get 很接近。 A
Separator
只是第一次发出“”,然后从那时起发出它的参数。
编辑:添加测试方法。
// ***** Following only needed for testing. *****
private static boolean Debug = false;
private final static String logName = "Container.log";
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\");
private static synchronized void log(boolean toStdoutToo, String s) {
if (Debug) {
if (toStdoutToo) {
System.out.println(s);
}
log(s);
}
}
private static synchronized void log(String s) {
if (Debug) {
try {
log.writeLn(logName, s);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
static volatile boolean testing = true;
// Tester object to exercise the container.
static class Tester<T> implements Runnable {
// My name.
T me;
// The container I am testing.
Container<T> c;
public Tester(Container<T> container, T name) {
c = container;
me = name;
}
private void pause() {
try {
Thread.sleep(0);
} catch (InterruptedException ex) {
testing = false;
}
}
public void run() {
// Spin on add/remove until stopped.
while (testing) {
// Add it.
Node<T> n = c.add(me);
log("Added " + me + ": " + c.toString());
pause();
// Remove it.
c.remove(n, me);
log("Removed " + me + ": " + c.toString());
pause();
}
}
}
static final String[] strings = {
"One", "Two", "Three", "Four", "Five",
"Six", "Seven", "Eight", "Nine", "Ten"
};
static final int TEST_THREADS = Math.min(10, strings.length);
public static void main(String[] args) throws InterruptedException {
Debug = true;
log.delete(logName);
Container<String> c = new Container<String>(10);
// Simple add/remove
log(true, "Simple test");
Node<String> it = c.add(strings[0]);
log("Added " + c.toString());
c.remove(it, strings[0]);
log("Removed " + c.toString());
// Capacity test.
log(true, "Capacity test");
ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
// Fill it.
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
log("Added " + strings[i] + " " + c.toString());
}
// Add one more.
try {
c.add("Wafer thin mint!");
} catch (IllegalStateException ise) {
log("Full!");
}
c.clear();
log("Empty: " + c.toString());
// Iterate test.
log(true, "Iterator test");
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
}
StringBuilder all = new StringBuilder ();
Separator sep = new Separator(",");
for (String s : c) {
all.append(sep.sep()).append(s);
}
log("All: "+all);
for (int i = 0; i < strings.length; i++) {
c.remove(nodes.get(i), strings[i]);
}
sep.reset();
all.setLength(0);
for (String s : c) {
all.append(sep.sep()).append(s);
}
log("None: " + all.toString());
// Multiple add/remove
log(true, "Multi test");
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
log("Added " + strings[i] + " " + c.toString());
}
log("Filled " + c.toString());
for (int i = 0; i < strings.length - 1; i++) {
c.remove(nodes.get(i), strings[i]);
log("Removed " + strings[i] + " " + c.toString());
}
c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
log("Empty " + c.toString());
// Multi-threaded add/remove
log(true, "Threads test");
c.clear();
for (int i = 0; i < TEST_THREADS; i++) {
Thread t = new Thread(new Tester<String>(c, strings[i]));
t.setName("Tester " + strings[i]);
log("Starting " + t.getName());
t.start();
}
// Wait for 10 seconds.
long stop = System.currentTimeMillis() + 10 * 1000;
while (System.currentTimeMillis() < stop) {
Thread.sleep(100);
}
// Stop the testers.
testing = false;
// Wait some more.
Thread.sleep(1 * 100);
// Get stats.
double added = c.totalAdded.doubleValue();
double skipped = c.totalSkipped.doubleValue();
//double freed = c.freed.doubleValue();
log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
}
也许您想看看Disruptor - 并发编程框架。
java.util.concurrent.ArrayBlockingQueue
的性能比较的论文:pdf如果库太多,坚持
java.util.concurrent.ArrayBlockingQueue
另请参阅
java.util.concurrent
。
阻塞队列将一直阻塞,直到有东西可以消耗或(可选)有空间可以生产:
http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
并发链接队列是非阻塞的,并使用灵活的算法,允许生产者和消费者同时活动:
http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html
Hazelcast 的 Queue 几乎提供了您所需的一切,但不支持循环。但从你的描述我不确定你是否真的需要它。
如果是我,我会按照您的指示使用 CircularFIFOBuffer,并在写入(添加)时围绕缓冲区进行同步。当监控应用程序想要读取缓冲区时,在缓冲区上进行同步,然后复制或克隆它以用于报告。
此建议基于以下假设:将缓冲区复制/克隆到新对象的延迟最小。如果有大量元素,并且复制时间很慢,那么这不是一个好主意。
伪代码示例:
public void writeRequest(String requestID) {
synchronized(buffer) {
buffer.add(requestID);
}
}
public Collection<String> getRequests() {
synchronized(buffer) {
return buffer.clone();
}
}
既然你特别要求给予作者(即网络服务器)比读者(即监控)更高的优先级,我建议采用以下设计。
Web 服务器将请求信息添加到由专用线程读取的并发队列中,该队列将请求添加到线程本地(因此非同步)队列中,该队列会覆盖最旧的元素,例如
EvictingQueue
或 CircularFifoQueue
。
该线程检查一个标志,该标志指示在处理每个请求后是否已请求报告,如果是,则从线程本地队列中存在的所有元素生成报告。
https://github.com/jianhong-li/LockFreeRingBuffer 这是我用java语言编写的LockFreeRingBuffer。
public class LockFreeRingBuffer<T> {
public static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LockFreeRingBuffer.class);
private final AtomicReferenceArray<T> buffer;
private final int bufferSize;
private final long bufferSizeMask;
private final AtomicLong writeIndex = new AtomicLong(0);
private final AtomicLong readIndex = new AtomicLong(0);
public LockFreeRingBuffer(int bufferSize) {
// Check if bufferSize is positive
if (bufferSize <= 1) {
throw new IllegalArgumentException("bufferSize must be positive");
}
// Check if bufferSize is power of 2
int zCnt = 0;
int _bufferSize = bufferSize;
while (_bufferSize > 0) {
if ((_bufferSize & 1) == 1) {
zCnt++;
}
if (zCnt > 1) {
throw new IllegalArgumentException("bufferSize must be power of 2");
}
_bufferSize = _bufferSize >> 1;
}
// Initialize buffer and bufferSize
this.buffer = new AtomicReferenceArray<>(bufferSize);
this.bufferSize = bufferSize;
this.bufferSizeMask = bufferSize - 1;
}
public int push(T value) {
// Ensure that the written data is valid
if (value == null) {
return -1;
}
long pWrite, pRead;
int loopCnt = 0;
for (; ; ) {
int _rIndex = makeIndex(pRead = readIndex.get());
int _wIndex = makeIndex(pWrite = writeIndex.get()); // push . _wIndex . Expect to read the latest version.
if (nextIndex(pWrite) == _rIndex) {
// buffer is full
return -2;
}
// Make sure that the current write pointer points to a NULL slot. That is, it can be written to. (Make sure that the take side has cleaned up the data
)
if (buffer.get(_wIndex) != null) {
if ((++loopCnt) > 16) {
logger.trace("TRACE: push data retry [01] - buffer[{}] is not null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
_wIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
Thread.yield();
}
continue;
}
// Update the pointer first, then write the value. Make sure the ownership is written correctly
if (writeIndex.compareAndSet(pWrite, pWrite + 1)) {
// Write value: Theoretically this position must be empty to write
if (buffer.compareAndSet(_wIndex, null, value)) {
// writeCnt.incrementAndGet();
return _wIndex;
}
// can not happen
throw new RuntimeException("state error");
}
}
}
public T pop() {
int loopCnt = 0;
long pRead, pWrite;
for (; ; ) {
// P_w == P_r , buffer is empty
int _rIndex = makeIndex(pRead = readIndex.get());
int _wIndex = makeIndex(pWrite = writeIndex.get());
if (_rIndex == _wIndex) {
// buffer is empty
return null;
}
T t = buffer.get(_rIndex); // There is no need to determine null here. However, it is a snapshot of pRead. So there might be a null situation.
if (t == null) {
if ((++loopCnt) > 16) {
logger.trace("TRACE: pop data retry [20] - buffer[{}] is null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
_rIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
Thread.yield();
}
continue;
}
/* ************************************************
* pWrite
* |
* v
* [] -> [] -> [] -> [] -> [] -> [] -> [] -> []
* ^
* |
* pRead
* ************************************************
* case: pRead = 1, pWrite = 1
* pWrite = pWrite + 1 = 2
* But we haven't had time to write the data yet. In this case, pRead = 1, pWrite = 2. The pRead location data is empty
.
* now, t==null will continue directly.
* after many loop,value at pRead effective finnaly. Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
* Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
*/
if (readIndex.compareAndSet(pRead, pRead + 1)) {
// pRead+1,
// Data indicating that the original pointer position can be safely manipulated. And the above is guaranteed to read a non-null value. That is, the competitive write with push is complete.
//
// set to null
boolean compareAndSet = buffer.compareAndSet(_rIndex, t, null);
// must be success if code has no bug
if (compareAndSet) {
// CAS success. t must be valid
return t;
}
logger.error("ERROR: pop_data_error - set to null failed, pRead: {} ({}) , pWrite: {} ({})readIndex:{} writeIndex:{}",
pRead, _rIndex, pWrite, _wIndex, readIndex.get(), writeIndex.get());
// can not happen
throw new RuntimeException("state error");
}
}
}
/**
* this function maybe inline by JIT.
*/
private int nextIndex(long currentIndex) {
return (int) ((currentIndex + 1) & bufferSizeMask);
}
/**
* this function maybe inline by JIT.
*/
private int makeIndex(long currentIndex) {
return (int) (currentIndex & bufferSizeMask);
}
// ======================== get / setter =======================
public long getReadCnt() {
return readIndex.get();
}
public long getWriteCnt() {
return writeIndex.get();
}
}