如何防止因为很多Runnables而导致内存不足

问题描述 投票:0回答:2

我需要运行 100 个任务。我想并行运行它们,但同时只能运行 10 个。我可以简单地使用它

ExecutorService service = Executors.newFixedThreadPool(10);

然后创建10个Runnables提交给ExecutorService。

但是如果有 100_000 个任务或 100_000_000 个任务呢?在某些时候,由于内存中有太多的 Runnable,它会导致内存不足。

所以我想要这样的东西:我将所有任务的参数存储在数据库中,然后当一个线程空闲时从数据库中取出另一个,并使用这个参数启动新任务。我想出了这样的东西

package com.company;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    private static ExecutorService executorService = Executors.newFixedThreadPool(10);

    private static class MyRunnable implements Runnable {

        private int i;  /* there would by actually parameters for task*/

        public MyRunnable(int i) {
            this.i = i;
        }

        public void run() {
            try {
                System.out.println(i); // there would by actually some code
                Thread.sleep(1_000);   // there would by actually some code
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                executorService.execute(new MyRunnable(i + 10 /* there would by actually taking parameters from database for new task*/));
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            executorService.submit(new MyRunnable(i /* there would by actually taking parameters from database for new task*/));
        }
    }
}

这是好的还是坏的解决方案?什么是最好的解决方案?

java out-of-memory executorservice runnable
2个回答
1
投票

您似乎担心太多挂起的

Runnable
任务对象可能占用比部署计算机上可用的内存更多的内存。您希望避免内存不足错误或虚拟内存抖动。

为避免太多任务,等到当前任务完成后再实例化下一个任务。

以提交另一个任务结束每个任务

为您的

Runnable
对象的每个实例添加第二个职责:从数据库中提取下一个待定参数,并向执行程序服务提交任务。

任务可以是您的

Runnable
实现的新实例,也可以是当前正在执行的相同
Runnable
实例。如果第二种方法意味着编写代码来清除“脏”状态,我更喜欢第一种方法。

当没有更多输入参数保留时,

Runnable
的最后一个运行实例在关闭执行程序服务后简单地完成。执行者服务仍然没有进一步的工作要做。

确保最终关闭您的执行程序服务。否则它的后台线程池可能会无限期地继续运行,即使在您的应用程序结束后,就像僵尸🧟u200d♂️一样。

示例代码

任务

让我们定义一个将数字相乘的任务,然后在控制台上报告。

此任务的构造函数采用线程安全的

Queue
整数作为输入。每个任务对象从该队列中提取一个元素,一个
Integer
对象。

此任务的构造函数还引用了现有的

ExecutorService
。当此任务完成其工作时,它会实例化另一个任务。旧任务将新任务提交给已通过的执行程序服务。执行者服务将在未来执行新任务,很可能是在不久的将来。

package work.basil.example.tasking;

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;

public class Multiplier implements Runnable
{
    // Member fields
    private final Queue < Integer > inputs; // Must be a thread-safe implementation of `Queue` such as `ArrayBlockingQueue`.
    private final ExecutorService executorService;

    // Constructor
    public Multiplier ( final Queue < Integer > inputs , final ExecutorService executorService )
    {
        Objects.requireNonNull( inputs );
        Objects.requireNonNull( executorService );

        this.inputs = inputs;
        this.executorService = executorService;
    }

    // Implement `Runnable`.
    @Override
    public void run ( )
    {
        this.multiply();
        this.submitNextTask();
    }

    private void multiply ( )
    {
        Integer input = this.inputs.poll();
        if ( Objects.nonNull( input ) )
        {
            Integer result = input * 2;
            // Sleep as a way of pretending we are doing much more work.
            try { Thread.sleep( Duration.ofMillis( ThreadLocalRandom.current().nextInt( 500 , 3000 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
            System.out.println( "input = " + input + " | result = " + result );
        }
    }

    private void submitNextTask ( )
    {
        if ( Objects.isNull( this.inputs.peek() ) )
        {
            // No further tasks to be submitted. Shutdown the executor service.
            this.executorService.close();
            System.out.println( "DEBUG - Multiplier called `close` on its executor service. " + Instant.now() );
        }
        else
        {
            // We have remaining inputs. Submit the next one to be worked.
            Multiplier task = new Multiplier( this.inputs , this.executorService );
            try
            {
                this.executorService.submit( task );
            }
            catch ( RejectedExecutionException e ) { e.printStackTrace(); }
            catch ( NullPointerException e ) { e.printStackTrace(); }
        }
    }
}

应用程序

让我们编写一个应用程序来运行其中的一些任务。

这个应用程序实例化了一个由一定大小的线程池支持的执行器服务。该应用程序实例化和完成与线程一样多的任务,以使球滚动。

从那里,每个正在执行的任务实例化另一个任务,然后提交它以供以后执行。

这个滚动的工作-实例化-提交-工作-实例化-提交循环一直持续到我们耗尽输入队列。

package work.basil.example.tasking;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class App
{
    public static void main ( String[] args )
    {
        App app = new App();
        app.demo();
    }

    private void demo ( )
    {
        System.out.println( "INFO - Demo ending. " + Instant.now() );

        final List < Integer > integers = IntStream.rangeClosed( 1 , 9 ).boxed().toList();
        final Queue < Integer > queueOfInputs = new ArrayBlockingQueue <>( integers.size() , false , integers );
        final int countThreads = 3;
        final ExecutorService executorService = Executors.newFixedThreadPool( countThreads );
        IntStream  // Using `IntStream` as an alternative to a `for` loop.
                .rangeClosed( 1 , countThreads )  // To get things started, take as many elements from queueOfInputs as threads in our executor service.
                .mapToObj( ( int input ) -> new Multiplier( queueOfInputs , executorService ) )  // Instantiate a task.
                .forEach( executorService :: submit );  // Submit that task.

        // Let the background threads do their work. Sleep this thread a while, then wake up and end this app’s execution.
        System.out.println( "INFO - Demo main thread sleeping. " + Instant.now() );
        try { Thread.sleep( Duration.ofMinutes( 1 ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
        if ( ! queueOfInputs.isEmpty() ) { System.out.println( "ERROR - Queue of queueOfInputs is not empty. " + Instant.now() ); }
        if ( ! executorService.isTerminated() ) { System.out.println( "ERROR - Executor service is not terminated. " + Instant.now() ); }
        executorService.close();  // Redundant. Just in case something went wrong with the task object shutting it down.

        System.out.println( "INFO - Demo ending. " + Instant.now() );
    }
}

执行

运行时,我们期望看到类似这样的输出:

INFO - Demo ending. 2023-02-21T23:49:20.393608Z
INFO - Demo main thread sleeping. 2023-02-21T23:49:20.407143Z
input = 3 | result = 6
input = 1 | result = 2
input = 2 | result = 4
input = 6 | result = 12
input = 4 | result = 8
input = 5 | result = 10
input = 7 | result = 14
input = 8 | result = 16
input = 9 | result = 18
ERROR - Executor service is not terminated. 2023-02-21T23:50:20.414179Z

警告:请注意控制台输出的最后一行。我还在调试。


0
投票

恕我直言,你在追鬼。

这些任务通常不是孤立存在的:

  • 有一些东西(另一个进程,一个用户,一个网络连接)导致了新任务的创建
  • 任务需要一些有限的时间来执行
  • 通常还有一些对任务结果感兴趣的东西(如果没人关心:你为什么执行任务?)

一些问题是:

  • 如果导致任务创建的速度比您处理任务的速度快,那么您的系统迟早会被任务淹没——即使数据库的容量也不是无限的
  • 如果每个任务只需要一秒钟,并且您可以并行处理 10 个任务,则意味着您需要超过 100 天的时间来处理 100_000_000 个任务——如果同时没有其他任务到达。即使有 100_000 个任务,你也需要大约 3 个小时
  • 如果您需要 100 天来处理任务:您如何确保您的应用程序可以不间断地运行这么长时间?
  • 如果每个任务只需要几毫秒,那么往返数据库可能会成为你的瓶颈
    • 如果创建了一个新任务,则需要将其存储到数据库中
    • 然后您需要从数据库中读取该任务
    • 如果任务完成,您需要将其从数据库中删除
  • 你如何处理任务的结果?
    • 将它们打印到控制台?谁会读那本书?
    • 将它们存储在某个文件中?这个文件很快就会变大
    • 将它们存储在数据库中?
    • 通过网络将它们发送回请求者?

从列出的点可以看出所有这些任务实例的内存使用只是几个相关问题之一。

© www.soinside.com 2019 - 2024. All rights reserved.