如何实现阻塞线程池执行器? [重复]

问题描述 投票:0回答:5

我们有一个很大的文本文件,其中每一行都需要密集

process
。设计是有一个
class
读取文件并通过
thread
将每行的处理委托给
thread pool
。一旦池中没有空闲线程进行处理,文件读取器类就应该被阻止读取下一行。所以我需要一个
blocking thread pool

在当前的实现中,

ThreadPoolExecutor.submit()
ThreadPoolExecutor.execute()
方法在配置的线程数变得繁忙后抛出
RejectedExecutionException
异常,如我在下面的代码片段中所示。

public class BlockingTp {

    public static void main(String[] args) {
        BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
        ThreadPoolExecutor executorService=
            new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, blockingQueue);
        int Jobs = 10;
        System.out.println("Starting application with " + Jobs + " jobs");
        for (int i = 1; i <= Jobs; i++)
            try {
                executorService.submit(new WorkerThread(i));
                System.out.println("job added " + (i));
            } catch (RejectedExecutionException e) {
                System.err.println("RejectedExecutionException");
            }
    }
}

class WorkerThread implements Runnable {
    int job;
    public WorkerThread(int job) {
        this.job = job;
    }
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (Exception excep) {
        }
    }
}

上述程序的输出是

Starting application to add 10 jobs
Added job #1
Added job #2
Added job #3
Added job #4
Added job #5
Added job #6
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException

有人可以提供一些线索,即我如何实现阻塞线程池

java multithreading threadpoolexecutor
5个回答
14
投票

有人可以提供一些线索,即我如何实现阻塞线程池。

您需要在执行器服务上设置拒绝执行处理程序。当线程去将作业放入执行器时,它会阻塞,直到阻塞队列中有空间为止。

BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
ThreadPoolExecutor executorService =
     new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, arrayBlockingQueue);
// when the queue is full, this tries to put into the queue which blocks
executorService.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // block until there's room
            executor.getQueue().put(r);
            // check afterwards and throw if pool shutdown
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + executor);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("Producer interrupted", e);
        }
    }
});

因此,TRE 不会抛出

RejectedExecutionException
,而是调用拒绝处理程序,而拒绝处理程序又会尝试将作业放回到队列中。这会阻止呼叫者。


0
投票

让我们再看看你的代码:

for (int i = 1; i <= Jobs; i++)
  try {
    tpExe.submit(new WorkerThread(i));
    System.out.println("job added " + (i));
  } catch (RejectedExecutionException e) {
    System.err.println("RejectedExecutionException");
  }

因此,当您尝试提交并且池正忙时,就会抛出该异常。如果你想解决这个问题,它可能看起来像:

public void yourSubmit(Runnable whatever) {
  boolean submitted = false;
  while (! submitted ) {
    try {
      tpExe.submit(new WorkerThread(whatever));
      submitted = true;
    } catch (RejectedExecutionException re) {
      // all threads busy ... so wait some time
      Thread.sleep(1000);
    }

换句话说:使用该异常作为提交的“标记”目前是不可能的。


0
投票

您可以使用信号量来控制资源。Reader 将通过获取信号量来读取并创建异步任务。如果每个线程都很忙,则 Reader 线程将等待直到线程可用。

public class MyExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

0
投票

这是支持所需行为的

RejectedExecutionHandler
。与其他实现不同,它不直接与队列交互,因此它应该与所有 Executor 实现兼容并且不会死锁。

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.BiFunction;

import static com.github.cowwoc.requirements.DefaultRequirements.assertThat;
import static com.github.cowwoc.requirements.DefaultRequirements.requireThat;

/**
 * Applies a different rejection policy depending on the thread that requested execution.
 */
public final class ThreadDependantRejectionHandler implements RejectedExecutionHandler
{
    private final ThreadLocal<Integer> numberOfRejections = ThreadLocal.withInitial(() -> 0);
    private final BiFunction<Thread, Executor, Action> threadToAction;

    /**
     * @param threadToAction indicates what action a thread should take when execution is rejected
     * @throws NullPointerException if {@code threadToAction} is null
     */
    public ThreadDependantRejectionHandler(BiFunction<Thread, Executor, Action> threadToAction)
    {
        requireThat(threadToAction, "threadToAction").isNotNull();
        this.threadToAction = threadToAction;
    }

    @SuppressWarnings("BusyWait")
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
    {
        if (executor.isShutdown())
            return;
        Thread currentThread = Thread.currentThread();
        Action action = threadToAction.apply(currentThread, executor);
        if (action == Action.RUN)
        {
            r.run();
            return;
        }
        if (action == Action.REJECT)
        {
            throw new RejectedExecutionException("The thread pool queue is full and the current thread is not " +
                "allowed to block or run the task");
        }

        assertThat(action, "action").isEqualTo(Action.BLOCK);
        int numberOfRejections = this.numberOfRejections.get();
        ++numberOfRejections;
        this.numberOfRejections.set(numberOfRejections);
        if (numberOfRejections > 1)
            return;
        try
        {
            ThreadLocalRandom random = ThreadLocalRandom.current();
            while (!executor.isShutdown())
            {
                try
                {
                    Thread.sleep(random.nextInt(10, 1001));
                }
                catch (InterruptedException e)
                {
                    throw new WrappingException(e);
                }
                executor.submit(r);
                numberOfRejections = this.numberOfRejections.get();
                if (numberOfRejections == 1)
                {
                    // Task was accepted, or executor has shut down
                    return;
                }
                // Task was rejected, reset the counter and try again.
                numberOfRejections = 1;
                this.numberOfRejections.set(numberOfRejections);
            }
            throw new RejectedExecutionException("Task " + r + " rejected from " + executor + " because " +
                "the executor has been shut down");
        }
        finally
        {
            this.numberOfRejections.set(0);
        }
    }

    public enum Action
    {
        /**
         * The thread should run the task directly instead of waiting for the executor.
         */
        RUN,
        /**
         * The thread should block until the executor is ready to run the task.
         */
        BLOCK,
        /**
         * The thread should reject execution of the task.
         */
        REJECT
    }
}

-1
投票

这对我有用。

class handler implements RejectedExecutionHandler{
    @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
© www.soinside.com 2019 - 2024. All rights reserved.