我需要运行 100 个任务。我想并行运行它们,但同时只能运行 10 个。我可以简单地使用它
ExecutorService service = Executors.newFixedThreadPool(10);
然后创建10个Runnables提交给ExecutorService。
但是如果有 100_000 个任务或 100_000_000 个任务呢?在某些时候,由于内存中有太多的 Runnable,它会导致内存不足。
所以我想要这样的东西:我将所有任务的参数存储在数据库中,然后当一个线程空闲时从数据库中取出另一个,并使用这个参数启动新任务。我想出了这样的东西
package com.company;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
private static class MyRunnable implements Runnable {
private int i; /* there would by actually parameters for task*/
public MyRunnable(int i) {
this.i = i;
}
public void run() {
try {
System.out.println(i); // there would by actually some code
Thread.sleep(1_000); // there would by actually some code
} catch (Exception ex) {
ex.printStackTrace();
} finally {
executorService.execute(new MyRunnable(i + 10 /* there would by actually taking parameters from database for new task*/));
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
executorService.submit(new MyRunnable(i /* there would by actually taking parameters from database for new task*/));
}
}
}
这是好的还是坏的解决方案?什么是最好的解决方案?
您似乎担心太多挂起的
Runnable
任务对象可能占用比部署计算机上可用的内存更多的内存。您希望避免内存不足错误或虚拟内存抖动。
为避免太多任务,等到当前任务完成后再实例化下一个任务。
为您的
Runnable
对象的每个实例添加第二个职责:从数据库中提取下一个待定参数,并向执行程序服务提交任务。
任务可以是您的
Runnable
实现的新实例,也可以是当前正在执行的相同 Runnable
实例。如果第二种方法意味着编写代码来清除“脏”状态,我更喜欢第一种方法。
当没有更多输入参数保留时,
Runnable
的最后一个运行实例在关闭执行程序服务后简单地完成。执行者服务仍然没有进一步的工作要做。
确保最终关闭您的执行程序服务。否则它的后台线程池可能会无限期地继续运行,即使在您的应用程序结束后,就像僵尸🧟u200d♂️一样。
让我们定义一个将数字相乘的任务,然后在控制台上报告。
此任务的构造函数采用线程安全的
Queue
整数作为输入。每个任务对象从该队列中提取一个元素,一个Integer
对象。
此任务的构造函数还引用了现有的
ExecutorService
。当此任务完成其工作时,它会实例化另一个任务。旧任务将新任务提交给已通过的执行程序服务。执行者服务将在未来执行新任务,很可能是在不久的将来。
package work.basil.example.tasking;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
public class Multiplier implements Runnable
{
// Member fields
private final Queue < Integer > inputs; // Must be a thread-safe implementation of `Queue` such as `ArrayBlockingQueue`.
private final ExecutorService executorService;
// Constructor
public Multiplier ( final Queue < Integer > inputs , final ExecutorService executorService )
{
Objects.requireNonNull( inputs );
Objects.requireNonNull( executorService );
this.inputs = inputs;
this.executorService = executorService;
}
// Implement `Runnable`.
@Override
public void run ( )
{
this.multiply();
this.submitNextTask();
}
private void multiply ( )
{
Integer input = this.inputs.poll();
if ( Objects.nonNull( input ) )
{
Integer result = input * 2;
// Sleep as a way of pretending we are doing much more work.
try { Thread.sleep( Duration.ofMillis( ThreadLocalRandom.current().nextInt( 500 , 3000 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
System.out.println( "input = " + input + " | result = " + result );
}
}
private void submitNextTask ( )
{
if ( Objects.isNull( this.inputs.peek() ) )
{
// No further tasks to be submitted. Shutdown the executor service.
this.executorService.close();
System.out.println( "DEBUG - Multiplier called `close` on its executor service. " + Instant.now() );
}
else
{
// We have remaining inputs. Submit the next one to be worked.
Multiplier task = new Multiplier( this.inputs , this.executorService );
try
{
this.executorService.submit( task );
}
catch ( RejectedExecutionException e ) { e.printStackTrace(); }
catch ( NullPointerException e ) { e.printStackTrace(); }
}
}
}
让我们编写一个应用程序来运行其中的一些任务。
这个应用程序实例化了一个由一定大小的线程池支持的执行器服务。该应用程序实例化和完成与线程一样多的任务,以使球滚动。
从那里,每个正在执行的任务实例化另一个任务,然后提交它以供以后执行。
这个滚动的工作-实例化-提交-工作-实例化-提交循环一直持续到我们耗尽输入队列。
package work.basil.example.tasking;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
public class App
{
public static void main ( String[] args )
{
App app = new App();
app.demo();
}
private void demo ( )
{
System.out.println( "INFO - Demo ending. " + Instant.now() );
final List < Integer > integers = IntStream.rangeClosed( 1 , 9 ).boxed().toList();
final Queue < Integer > queueOfInputs = new ArrayBlockingQueue <>( integers.size() , false , integers );
final int countThreads = 3;
final ExecutorService executorService = Executors.newFixedThreadPool( countThreads );
IntStream // Using `IntStream` as an alternative to a `for` loop.
.rangeClosed( 1 , countThreads ) // To get things started, take as many elements from queueOfInputs as threads in our executor service.
.mapToObj( ( int input ) -> new Multiplier( queueOfInputs , executorService ) ) // Instantiate a task.
.forEach( executorService :: submit ); // Submit that task.
// Let the background threads do their work. Sleep this thread a while, then wake up and end this app’s execution.
System.out.println( "INFO - Demo main thread sleeping. " + Instant.now() );
try { Thread.sleep( Duration.ofMinutes( 1 ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
if ( ! queueOfInputs.isEmpty() ) { System.out.println( "ERROR - Queue of queueOfInputs is not empty. " + Instant.now() ); }
if ( ! executorService.isTerminated() ) { System.out.println( "ERROR - Executor service is not terminated. " + Instant.now() ); }
executorService.close(); // Redundant. Just in case something went wrong with the task object shutting it down.
System.out.println( "INFO - Demo ending. " + Instant.now() );
}
}
运行时,我们期望看到类似这样的输出:
INFO - Demo ending. 2023-02-21T23:49:20.393608Z
INFO - Demo main thread sleeping. 2023-02-21T23:49:20.407143Z
input = 3 | result = 6
input = 1 | result = 2
input = 2 | result = 4
input = 6 | result = 12
input = 4 | result = 8
input = 5 | result = 10
input = 7 | result = 14
input = 8 | result = 16
input = 9 | result = 18
ERROR - Executor service is not terminated. 2023-02-21T23:50:20.414179Z
警告:请注意控制台输出的最后一行。我还在调试。
恕我直言,你在追鬼。
这些任务通常不是孤立存在的:
一些问题是:
从列出的点可以看出所有这些任务实例的内存使用只是几个相关问题之一。