我正在寻找一种方法来阻止直到
BlockingQueue
为空。
我知道,在多线程环境中,只要有生产者将项目放入
BlockingQueue
,就可能会出现队列变空,几纳秒后就装满了项目的情况。
但是,如果只有一个生产者,那么它可能需要等待(并阻塞),直到停止将项目放入队列后队列为空。
Java/伪代码:
// Producer code
BlockingQueue queue = new BlockingQueue();
while (having some tasks to do) {
queue.put(task);
}
queue.waitUntilEmpty(); // <-- how to do this?
print("Done");
你有什么想法吗?
编辑:我知道包装
BlockingQueue
并使用额外的条件可以解决问题,我只是问是否有一些预制的解决方案和/或更好的替代方案。
使用
wait()
和 notify()
的简单解决方案:
// Producer:
// `sychronized` is necessary, otherwise `.notify` will not work
synchronized(queue) {
while (!queue.isEmpty())
queue.wait(); // wait for the queue to become empty
// this is not a deadlock, because `.wait` will release the lock
queue.put();
}
// Consumer:
synchronized(queue) {
queue.get();
if (queue.isEmpty())
queue.notify(); // notify the producer
}
我知道你可能已经有一堆线程主动轮询或排队,但我仍然觉得你的流程/设计不太正确。
队列变空并不意味着之前添加的任务已完成,有些项目可能需要很长时间才能处理,因此检查是否为空并没有多大用处。
所以你应该做的是忘记
BlockingQueue
,你可以将它用作任何其他集合。将项目翻译成 Collections
的 Callable
并利用 ExecutorService.invokeAll()
。
Collection<Item> queue = ...
Collection<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (Item item : queue) {
tasks.add(new Callable<Result>() {
@Override
public Result call() throws Exception {
// process the item ...
return result;
}
});
}
// look at the results, add timeout for invokeAll if necessary
List<Future<Result>> results = executorService.invokeAll(tasks);
// done
这种方法将使您能够完全控制生产者可以等待多长时间以及正确的异常处理。
这并不完全是您想要做的,但是使用
SynchronousQueue
会产生与 Java/伪代码非常相似的效果,即生产者阻塞,直到某个消费者检索到所有数据为止。
唯一的区别是生产者在每次放入时都会阻塞,直到消费者来检索数据,而不是最后只阻塞一次。不确定这是否会对您的情况产生影响。如果生产者执行的任务有点昂贵,我预计它只会产生显着的差异。
您的用例应该非常特殊,因为大多数情况下您只想在队列已满时阻止生产者,而不是等到队列为空。
无论如何,这是可行的。我相信旋转直到
isEmpty
返回 true 并不是那么低效,因为生产者将在本地旋转,即,将访问自己的缓存,而不是破坏总线。然而,由于线程仍然是可调度的,因此它会消耗 CPU 时间。但本地旋转绝对是更简单的方法。否则我会看到两个选项:
wait
+ notify
就像 @nulare 建议的那样@niclee 给出的答案似乎不错,但请查看@hankduan 的评论。
我在寻找类似问题的解决方案时看到了这个问题。
最后我或多或少重写了
LinkedBlockingQueue
。Collection
或 Iterable
。/**********************************************************************************************************************
* Import specifications
*********************************************************************************************************************/
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;
/**********************************************************************************************************************
* This class implements a completely reentrant FIFO.
*********************************************************************************************************************/
public class BlockingFIFO<E>
{
/********************************************************************************************************************
* The constructor creates an empty FIFO with a capacity of {@link Integer#MAX_VALUE}.
*******************************************************************************************************************/
public BlockingFIFO()
{
// -----------------
// Initialize object
// -----------------
this(Integer.MAX_VALUE);
} // constructor
/********************************************************************************************************************
* The constructor creates an empty FIFO with the specified capacity.
*
* @param capacity_ipar The maximum number of elements the FIFO may contain.
*******************************************************************************************************************/
public BlockingFIFO(int capacity_ipar)
{
// ---------------------
// Initialize attributes
// ---------------------
lock_attr = new ReentrantLock();
not_empty_attr = lock_attr.newCondition();
not_full_attr = lock_attr.newCondition();
head_attr = null;
tail_attr = null;
capacity_attr = capacity_ipar;
size_attr = 0;
} // constructor
/********************************************************************************************************************
* This method removes all of the elements from the FIFO.
*
* @return The number of elements in the FIFO before it was cleared.
*******************************************************************************************************************/
public int clear()
{
// -----------------
// Initialize result
// -----------------
int result;
result = 0;
// ----------
// Clear FIFO
// ----------
lock_attr.lock();
try
{
result = size_attr;
head_attr = null;
tail_attr = null;
size_attr = 0;
not_full_attr.signalAll();
}
finally
{
lock_attr.unlock();
}
// ----
// Done
// ----
return (result);
} // clear
/********************************************************************************************************************
* This method returns the number of elements in the FIFO.
*
* @return The number of elements in the FIFO.
*******************************************************************************************************************/
public int size()
{
// -----------
// Return size
// -----------
lock_attr.lock();
try
{
return (size_attr);
}
finally
{
lock_attr.unlock();
}
} // size
/********************************************************************************************************************
* This method returns the number of additional elements that the FIFO can ideally accept without blocking.
*
* @return The remaining capacity the FIFO.
*******************************************************************************************************************/
public int remainingCapacity()
{
// -------------------------
// Return remaining capacity
// -------------------------
lock_attr.lock();
try
{
return (capacity_attr - size_attr);
}
finally
{
lock_attr.unlock();
}
} // remainingCapacity
/********************************************************************************************************************
* This method waits for the FIFO to become empty.
*
* @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
* empty.
*******************************************************************************************************************/
public void waitEmpty()
throws InterruptedException
{
// -----------------------------
// Wait for FIFO to become empty
// -----------------------------
lock_attr.lock();
try
{
while (size_attr > 0)
not_full_attr.await();
}
finally
{
lock_attr.unlock();
}
} // waitEmpty
/********************************************************************************************************************
* This method waits at most the specified time for the FIFO to become empty.
* <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
*
* @param timeout_ipar The maximum number of milliseconds to wait for the FIFO to become empty.
* @return True if and only if the FIFO is empty.
* @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
* empty.
*******************************************************************************************************************/
public boolean waitEmpty(long timeout_ipar)
throws InterruptedException
{
// ------------------
// Determine deadline
// ------------------
Date deadline;
deadline = new Date(System.currentTimeMillis() + timeout_ipar);
// -----------------------------
// Wait for FIFO to become empty
// -----------------------------
lock_attr.lock();
try
{
while (size_attr > 0)
{
if (!not_full_attr.awaitUntil(deadline))
return (false);
}
return (true);
}
finally
{
lock_attr.unlock();
}
} // waitEmpty
/********************************************************************************************************************
* This method waits at most the specified time for the FIFO to become empty.
* <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
*
* @param timeout_ipar The maximum time to wait for the FIFO to become empty.
* @param unit_ipar The unit of the specified timeout.
* @return True if and only if the FIFO is empty.
* @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
* empty.
*******************************************************************************************************************/
public boolean waitEmpty(long timeout_ipar,
TimeUnit unit_ipar)
throws InterruptedException
{
// -----------------------------
// Wait for FIFO to become empty
// -----------------------------
return (waitEmpty(unit_ipar.toMillis(timeout_ipar)));
} // waitEmpty
/********************************************************************************************************************
* This method adds the specified element at the end of the FIFO if it is possible to do so immediately without
* exceeding the queue's capacity.
* <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
*
* @param element_ipar The element to add to the FIFO.
* @return True if and only if the element was added to the FIFO.
*******************************************************************************************************************/
public boolean offer(E element_ipar)
{
// ----------------------
// Try to add the element
// ----------------------
lock_attr.lock();
try
{
if (capacity_attr > size_attr)
{
push(element_ipar);
return (true);
}
else
return (false);
}
finally
{
lock_attr.unlock();
}
} // offer
/********************************************************************************************************************
* This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
* for space to become available.
* <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
*
* @param element_ipar The element to add to the FIFO.
* @param timeout_ipar The maximum number of milliseconds to wait for space to become available.
* @return True if and only if the element was added to the FIFO.
* @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
* available.
*******************************************************************************************************************/
public boolean offer(E element_ipar,
long timeout_ipar)
throws InterruptedException
{
// ------------------
// Determine deadline
// ------------------
Date deadline;
deadline = new Date(System.currentTimeMillis() + timeout_ipar);
// ----------------------
// Try to add the element
// ----------------------
lock_attr.lock();
try
{
while (size_attr == capacity_attr)
{
if (!not_full_attr.awaitUntil(deadline))
return (false);
}
push(element_ipar);
return (true);
}
finally
{
lock_attr.unlock();
}
} // offer
/********************************************************************************************************************
* This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
* for space to become available.
* <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
*
* @param element_ipar The element to add to the FIFO.
* @param timeout_ipar The maximum time to wait for space to become available.
* @param unit_ipar The unit of the specified timeout.
* @return True if and only if the element was added to the FIFO.
* @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
* available.
*******************************************************************************************************************/
public boolean offer(E element_ipar,
long timeout_ipar,
TimeUnit unit_ipar)
throws InterruptedException
{
// ----------------------------
// Try to add specified element
// ----------------------------
return (offer(element_ipar, unit_ipar.toMillis(timeout_ipar)));
} // offer
/********************************************************************************************************************
* This method adds the specified element at the end of the FIFO, waiting if necessary for space to become available.
*
* @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
* available.
*******************************************************************************************************************/
public void put(E element_ipar)
throws InterruptedException
{
// ----------------------
// Try to add the element
// ----------------------
lock_attr.lock();
try
{
while (size_attr == capacity_attr)
not_full_attr.await();
push(element_ipar);
}
finally
{
lock_attr.unlock();
}
} // put
/********************************************************************************************************************
* This method retrieves, but does not remove, the head of the FIFO, or returns <code>null</code> if the FIFO is
* empty.
*
* @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
*******************************************************************************************************************/
public E peek()
{
// --------------------
// Return first element
// --------------------
lock_attr.lock();
try
{
if (size_attr == 0)
return (null);
else
return (head_attr.contents);
}
finally
{
lock_attr.unlock();
}
} // peek
/********************************************************************************************************************
* This method retrieves and removes the head of the FIFO, or returns <code>null</code> if the FIFO is
* empty.
*
* @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
*******************************************************************************************************************/
public E poll()
{
// --------------------
// Return first element
// --------------------
lock_attr.lock();
try
{
if (size_attr == 0)
return (null);
else
return (pop());
}
finally
{
lock_attr.unlock();
}
} // poll
/********************************************************************************************************************
* This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
* element to become available.
* <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
*
* @param timeout_ipar The maximum number of milliseconds to wait for an element to become available.
* @return The head of the FIFO, or <code>null</code> if the specified waiting time elapses
* before an element is available.
* @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
* available.
*******************************************************************************************************************/
public E poll(long timeout_ipar)
throws InterruptedException
{
// ------------------
// Determine deadline
// ------------------
Date deadline;
deadline = new Date(System.currentTimeMillis() + timeout_ipar);
// --------------------
// Return first element
// --------------------
lock_attr.lock();
try
{
while (size_attr == 0)
{
if (!not_empty_attr.awaitUntil(deadline))
return (null);
}
return (pop());
}
finally
{
lock_attr.unlock();
}
} // poll
/********************************************************************************************************************
* This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
* element to become available.
* <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
*
* @param timeout_ipar The maximum time to wait for an element to become available.
* @param unit_ipar The unit of the specified timeout.
* @return The head of the FIFO, or <code>null</code> if the specified waiting time elapses
* before an element is available.
* @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
* available.
*******************************************************************************************************************/
public E poll(long timeout_ipar,
TimeUnit unit_ipar)
throws InterruptedException
{
// ------------------------
// Try to get first element
// ------------------------
return (poll(unit_ipar.toMillis(timeout_ipar)));
} // poll
/********************************************************************************************************************
* This method retrieves and removes the head of the FIFO, waiting if necessary for an element to become available.
*
* @return The head of the FIFO.
* @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
* available.
*******************************************************************************************************************/
public E take()
throws InterruptedException
{
// ---------------------------
// Try to return first element
// ---------------------------
lock_attr.lock();
try
{
while (size_attr == 0)
not_empty_attr.await();
return (pop());
}
finally
{
lock_attr.unlock();
}
} // take
/********************************************************************************************************************
* This class implements as node within the FIFO.
*******************************************************************************************************************/
private class Node
{
E contents;
Node next;
} // class Node
/********************************************************************************************************************
* This method adds the specified element to the end of the FIFO.
* <br>It sends a signal to all threads waiting for the FIFO to contain something.
* <br>The caller should have locked the object and have made sure the list is not full.
*******************************************************************************************************************/
private void push(E element_ipar)
{
// -----------
// Create node
// -----------
Node node;
node = new Node();
node.contents = element_ipar;
node.next = null;
// --------------
// Add to the end
// --------------
if (head_attr == null)
head_attr = node;
else
tail_attr.next = node;
tail_attr = node;
// ----------------------
// We got another element
// ----------------------
size_attr++;
not_empty_attr.signalAll();
} // push
/********************************************************************************************************************
* This method removes the first element from the FIFO and returns it.
* <br>It sends a signal to all threads waiting for the FIFO to have space available.
* <br>The caller should have locked the object and have made sure the list is not empty.
*******************************************************************************************************************/
private E pop()
{
// ------------
// Isolate node
// ------------
Node node;
node = head_attr;
head_attr = node.next;
if (head_attr == null)
tail_attr = null;
// --------------------------
// We removed another element
// --------------------------
size_attr--;
not_full_attr.signalAll();
// ----
// Done
// ----
return (node.contents);
} // pop
/********************************************************************************************************************
* This attribute represents the lock on the FIFO.
*******************************************************************************************************************/
private Lock lock_attr;
/********************************************************************************************************************
* This attribute represents the condition of the FIFO not being empty.
*******************************************************************************************************************/
private Condition not_empty_attr;
/********************************************************************************************************************
* This attribute represents the condition of the FIFO not being full.
*******************************************************************************************************************/
private Condition not_full_attr;
/********************************************************************************************************************
* This attribute represents the first element of the FIFO.
*******************************************************************************************************************/
private Node head_attr;
/********************************************************************************************************************
* This attribute represents the last element of the FIFO.
*******************************************************************************************************************/
private Node tail_attr;
/********************************************************************************************************************
* This attribute represents the capacity of the FIFO.
*******************************************************************************************************************/
private int capacity_attr;
/********************************************************************************************************************
* This attribute represents the size of the FIFO.
*******************************************************************************************************************/
private int size_attr;
} // class BlockingFIFO