修复并动态调整工作线程数

问题描述 投票:0回答:1

我目前正在实现一个数据容器/数据结构,它具有addRecord(最终Redcord记录)和close()方法。

public class Container{

    Container() {
    }

    public void addRecord(final Record record){
        // do something with the record
        // add the modified it to the container
    }

    public void close(){

    }     
}

因为为了将记录存储到容器需要完成几个步骤,我想在将这些步骤最终添加到容器之前将这些步骤外包给线程。

我的问题是我不希望任何容器实例使用超过4个线程,如果我有几个容器实例同时打开不超过,例如,应该使用12个线程。除此之外,我希望一旦我创建了第4个容器,其他3个打开的容器中的每一个都应该丢失其中一个线程,这样所有4个容器都可以使用3个线程。

我正在研究ThreadPools和ExecutorServices。虽然设置大小为12的线程池很简单,但在使用ExecutorServices时很难将使用过的线程数限制为4。回想一下,我想要最多12个线程,这就是为什么我有一个大小为12的静态实例,我在Container实例之间共享。

这就是我的开始

public class Container{

    public static final ExecutorService SERVICE= Executors.newFixedThreadPool(12);

    final List<Future<Entry>> _fRecords = new LinkedList<>();

    Container() {
    }

    public void addRecord(final Record record){
        _fRecords.add(SERVICE.submit(new Callable<Entry>{

            @Override
            public Entry call() throws Exception {
                return record.createEntry();
            }
        }));
    }

    public void close() throws Exception {
        for(final Future<Entry> f) {
            f.get(); // and add the entry to the container           
        }
    }     
}

显然,这不能确保单个实例最多使用4个线程。此外,我不知道如果另一个容器需要几个线程,这些Executorservices如何被强制关闭工作人员或重新分配它们。

Questions:

  1. 由于Redord本身是一个接口,并且createEntry()的运行时间取决于实际的实现,我想确保不同的容器不使用相同的线程/ worker,即我不希望任何线程完成工作两个不同的容器。这背后的想法是,我不希望仅处理“长时间运行”记录的容器实例因仅处理“短运行记录”的容器而变慢。如果这在概念上没有意义(问题),就不需要为不同的容器提供隔离的线程/工作者。
  2. 正在实施我自己的ExecutorService正确的方法吗?有人能指出我有一个类似问题/解决方案的项目吗?否则我想我必须实现自己的ExecutorService并在我的容器中共享,或者是否有更好的解决方案?
  3. 目前,我正在以close方法收集所有期货,因为容器必须按照它们到达的顺序存储记录/条目。显然,这需要很多时间,因此一旦未来完成,我想做到这一点。让一个额外的工人单独处理未来是否有意义,或者让我的工人做这项工作更好,即有线程互动。
java multithreading threadpool executorservice worker
1个回答
0
投票

我不认为Java运行时提供了满足您开箱即用需求的东西。

我编写了自己的类来满足这些需求(公共池+给定队列的限制)。

public static class ThreadLimitedQueue {
    private final ExecutorService executorService;
    private final int limit;

    private final Object lock = new Object();
    private final LinkedList<Runnable> pending = new LinkedList<>();
    private int inProgress = 0;

    public ThreadLimitedQueue(final ExecutorService executorService, final int limit) {
        this.executorService = executorService;
        this.limit = limit;
    }

    public void submit(Runnable runnable) {
        final Runnable wrapped = () -> {
            try {
                runnable.run();
            } finally {
                onComplete();
            }
        };
        synchronized (lock) {
            if (inProgress < limit) {
                inProgress++;
                executorService.submit(wrapped);
            } else {
                pending.add(wrapped);
            }
        }
    }

    private void onComplete() {
        synchronized (lock) {
            final Runnable pending = this.pending.poll();
            if (pending == null || inProgress > limit) {
                inProgress--;
            } else {
                executorService.submit(pending);
            }
        }
    }
}

我的情况limit的唯一区别是不变的,但你可以修改它。例如,你可以用int limit替换Supplier<Integer> limitFunction,这个函数必须提供动态限制,例如

Supplier<Integer> limitFunction = 12 / containersList.size();

只是让它更健壮(例如,如果containersList为空或超过12,该怎么办)

© www.soinside.com 2019 - 2024. All rights reserved.