我目前正在实现一个数据容器/数据结构,它具有addRecord(最终Redcord记录)和close()方法。
public class Container{
Container() {
}
public void addRecord(final Record record){
// do something with the record
// add the modified it to the container
}
public void close(){
}
}
因为为了将记录存储到容器需要完成几个步骤,我想在将这些步骤最终添加到容器之前将这些步骤外包给线程。
我的问题是我不希望任何容器实例使用超过4个线程,如果我有几个容器实例同时打开不超过,例如,应该使用12个线程。除此之外,我希望一旦我创建了第4个容器,其他3个打开的容器中的每一个都应该丢失其中一个线程,这样所有4个容器都可以使用3个线程。
我正在研究ThreadPools和ExecutorServices。虽然设置大小为12的线程池很简单,但在使用ExecutorServices时很难将使用过的线程数限制为4。回想一下,我想要最多12个线程,这就是为什么我有一个大小为12的静态实例,我在Container实例之间共享。
这就是我的开始
public class Container{
public static final ExecutorService SERVICE= Executors.newFixedThreadPool(12);
final List<Future<Entry>> _fRecords = new LinkedList<>();
Container() {
}
public void addRecord(final Record record){
_fRecords.add(SERVICE.submit(new Callable<Entry>{
@Override
public Entry call() throws Exception {
return record.createEntry();
}
}));
}
public void close() throws Exception {
for(final Future<Entry> f) {
f.get(); // and add the entry to the container
}
}
}
显然,这不能确保单个实例最多使用4个线程。此外,我不知道如果另一个容器需要几个线程,这些Executorservices如何被强制关闭工作人员或重新分配它们。
我不认为Java运行时提供了满足您开箱即用需求的东西。
我编写了自己的类来满足这些需求(公共池+给定队列的限制)。
public static class ThreadLimitedQueue {
private final ExecutorService executorService;
private final int limit;
private final Object lock = new Object();
private final LinkedList<Runnable> pending = new LinkedList<>();
private int inProgress = 0;
public ThreadLimitedQueue(final ExecutorService executorService, final int limit) {
this.executorService = executorService;
this.limit = limit;
}
public void submit(Runnable runnable) {
final Runnable wrapped = () -> {
try {
runnable.run();
} finally {
onComplete();
}
};
synchronized (lock) {
if (inProgress < limit) {
inProgress++;
executorService.submit(wrapped);
} else {
pending.add(wrapped);
}
}
}
private void onComplete() {
synchronized (lock) {
final Runnable pending = this.pending.poll();
if (pending == null || inProgress > limit) {
inProgress--;
} else {
executorService.submit(pending);
}
}
}
}
我的情况limit
的唯一区别是不变的,但你可以修改它。例如,你可以用int limit
替换Supplier<Integer> limitFunction
,这个函数必须提供动态限制,例如
Supplier<Integer> limitFunction = 12 / containersList.size();
只是让它更健壮(例如,如果containersList为空或超过12,该怎么办)