如何在一定次数的执行后停止计划重复执行的Runnable

问题描述 投票:56回答:6

情况

我有一个Runnable。我有一个类,使用带有scheduleWithFixedDelay的ScheduledExecutorService来安排此Runnable执行。

目标

我想改变这个类来无限期地安排Runnable进行固定延迟执行,或者直到它运行了一定次数,具体取决于传递给构造函数的一些参数。

如果可能的话,我想使用相同的Runnable,因为它在概念上应该是“运行”相同的东西。

可能的方法

Approach #1

有两个Runnables,一个在多次执行后(它保持计数)取消计划,另一个不执行:

public class MyClass{
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public enum Mode{
        INDEFINITE, FIXED_NO_OF_TIMES
    }

    public MyClass(Mode mode){
        if(mode == Mode.INDEFINITE){
            scheduler.scheduleWithFixedDelay(new DoSomethingTask(), 0, 100, TimeUnit.MILLISECONDS);
        }else if(mode == Mode.FIXED_NO_OF_TIMES){
            scheduler.scheduleWithFixedDelay(new DoSomethingNTimesTask(), 0, 100, TimeUnit.MILLISECONDS);
        }
    }

    private class DoSomethingTask implements Runnable{
        @Override
        public void run(){
            doSomething();
        }
    }

    private class DoSomethingNTimesTask implements Runnable{
        private int count = 0;

        @Override
        public void run(){
            doSomething();
            count++;
            if(count > 42){
                // Cancel the scheduling.
                // Can you do this inside the run method, presumably using
                // the Future returned by the schedule method? Is it a good idea?
            }
        }
    }

    private void doSomething(){
        // do something
    }
}

我宁愿只有一个Runnable来执行doSomething方法。将调度绑定到Runnable感觉不对。你怎么看待这件事?

Approach #2

有一个Runnable用于执行我们想要定期运行的代码。有一个单独的调度runnable,用于检查第一个Runnable运行的次数,并在达到一定量时取消。这可能不准确,因为它是异步的。感觉有点麻烦。你怎么看待这件事?

Approach #3

扩展ScheduledExecutorService并添加方法“scheduleWithFixedDelayNTimes”。也许这样的课程已经存在?目前,我正在使用Executors.newSingleThreadScheduledExecutor();来获取我的ScheduledExecutorService实例。我可能必须实现类似的功能来实例化扩展的ScheduledExecutorService。这可能很棘手。你怎么看待这件事?

No scheduler approach [Edit]

我无法使用调度程序。我可以改为:

for(int i = 0; i < numTimesToRun; i++){
    doSomething();
    Thread.sleep(delay);
}

并在某个线程中运行它。你对那个怎么想的?您可能仍然可以使用runnable并直接调用run方法。


欢迎任何建议。我正在寻找辩论,找到实现目标的“最佳实践”方式。

java scheduling scheduled-tasks
6个回答
61
投票

您可以在Future上使用cancel()方法。来自scheduleAtFixedRate的javadocs

Otherwise, the task will only terminate via cancellation or termination of the executor

下面是一些示例代码,它将Runnable包装在另一个中,跟踪原始运行的次数,并在运行N次后取消。

public void runNTimes(Runnable task, int maxRunCount, long period, TimeUnit unit, ScheduledExecutorService executor) {
    new FixedExecutionRunnable(task, maxRunCount).runNTimes(executor, period, unit);
}

class FixedExecutionRunnable implements Runnable {
    private final AtomicInteger runCount = new AtomicInteger();
    private final Runnable delegate;
    private volatile ScheduledFuture<?> self;
    private final int maxRunCount;

    public FixedExecutionRunnable(Runnable delegate, int maxRunCount) {
        this.delegate = delegate;
        this.maxRunCount = maxRunCount;
    }

    @Override
    public void run() {
        delegate.run();
        if(runCount.incrementAndGet() == maxRunCount) {
            boolean interrupted = false;
            try {
                while(self == null) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
                self.cancel(false);
            } finally {
                if(interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void runNTimes(ScheduledExecutorService executor, long period, TimeUnit unit) {
        self = executor.scheduleAtFixedRate(this, 0, period, unit);
    }
}

8
投票

来自API描述的引用(qazxsw poi):

创建并执行一个周期性动作,该动作在给定的初始延迟之后首先被启用,并且随后在一次执行的终止和下一次执行的开始之间给定延迟。如果任务的任何执行遇到异常,则后续执行被禁止。否则,任务将仅通过取消或终止执行者来终止。

所以,最简单的事情是“只是抛出异常”(即使这被认为是不好的做法):

ScheduledExecutorService.scheduleWithFixedDelay

5
投票

到目前为止,sbridges解决方案似乎是最干净的解决方案,除了你提到的,它还有责任处理static class MyTask implements Runnable { private int runs = 0; @Override public void run() { System.out.println(runs); if (++runs >= 20) throw new RuntimeException(); } } public static void main(String[] args) { ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor(); s.scheduleWithFixedDelay(new MyTask(), 0, 100, TimeUnit.MILLISECONDS); } 本身的执行次数。它不应该与此有关,而是重复应该是处理调度的类的参数。为实现这一目标,我建议采用以下设计,为Runnable引入一个新的执行器类。该类提供了两种用于调度任务的公共方法,它们是标准的Runnables,具有有限或无限重复。如果需要,可以传递相同的Runnables用于有限和无限调度(这对于扩展Runnable类以提供有限重复的所有提出的解决方案是不可能的)。取消有限重复的处理完全封装在调度程序类中:

Runnable

公平地说,管理重复的逻辑仍然是class MaxNScheduler { public enum ScheduleType { FixedRate, FixedDelay } private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); public ScheduledFuture<?> scheduleInfinitely(Runnable task, ScheduleType type, long initialDelay, long period, TimeUnit unit) { return scheduleNTimes(task, -1, type, initialDelay, period, unit); } /** schedule with count repetitions */ public ScheduledFuture<?> scheduleNTimes(Runnable task, int repetitions, ScheduleType type, long initialDelay, long period, TimeUnit unit) { RunnableWrapper wrapper = new RunnableWrapper(task, repetitions); ScheduledFuture<?> future; if(type == ScheduleType.FixedDelay) future = executorService.scheduleWithFixedDelay(wrapper, initialDelay, period, TimeUnit.MILLISECONDS); else future = executorService.scheduleAtFixedRate(wrapper, initialDelay, period, TimeUnit.MILLISECONDS); synchronized(wrapper) { wrapper.self = future; wrapper.notify(); // notify wrapper that it nows about it's future (pun intended) } return future; } private static class RunnableWrapper implements Runnable { private final Runnable realRunnable; private int repetitions = -1; ScheduledFuture<?> self = null; RunnableWrapper(Runnable realRunnable, int repetitions) { this.realRunnable = realRunnable; this.repetitions = repetitions; } private boolean isInfinite() { return repetitions < 0; } private boolean isFinished() { return repetitions == 0; } @Override public void run() { if(!isFinished()) // guard for calls to run when it should be cancelled already { realRunnable.run(); if(!isInfinite()) { repetitions--; if(isFinished()) { synchronized(this) // need to wait until self is actually set { if(self == null) { try { wait(); } catch(Exception e) { /* should not happen... */ } } self.cancel(false); // cancel gracefully (not throwing InterruptedException) } } } } } } } ,但它是一个Runnable完全内部的Runnable,而MaxNScheduler任务通过调度不必关心调度的性质。此外,如果需要,可以通过每次执行Runnable时提供一些回调,将此问题轻松移出到调度程序中。这会使代码稍微复杂化,并且需要保留一些RunnableWrapper.runs的映射和相应的重复,这就是为什么我选择将计数器保留在RunnableWrapper类中的原因。

设置self时,我还在包装器上添加了一些同步。理论上需要这样做,当执行结束时,可能还没有分配自我(一个相当理论的场景,但只能重复一次)。

取消是优雅地处理,没有抛出RunnableWrapper,如果在取消执行之前,另一轮被安排,InterruptedException将不会调用底层的RunnableWrapper


1
投票

你的第一种方法似乎没问题您可以通过将Runnable对象传递给其构造函数来组合两种类型的runnable(或者将-1作为必须运行的最大次数传递-1),并使用此模式来确定是否必须取消runnable:

mode

您必须将预定的未来传递给您的任务才能使其自行取消,否则您可能会抛出异常。


1
投票

这是我的建议(我认为它处理了问题中提到的所有案例):

private class DoSomethingNTimesTask implements Runnable{
    private int count = 0;
    private final int limit;

    /**
     * Constructor for no limit
     */
    private DoSomethingNTimesTask() {
        this(-1);
    }

    /**
     * Constructor allowing to set a limit
     * @param limit the limit (negative number for no limit)
     */
    private DoSomethingNTimesTask(int limit) {
        this.limit = limit;
    }

    @Override
    public void run(){
        doSomething();
        count++;
        if(limit >= 0 && count > limit){
            // Cancel the scheduling
        }
    }
}

此外,它允许外部方停止public class RepeatedScheduled implements Runnable { private int repeatCounter = -1; private boolean infinite; private ScheduledExecutorService ses; private long initialDelay; private long delay; private TimeUnit unit; private final Runnable command; private Future<?> control; public RepeatedScheduled(ScheduledExecutorService ses, Runnable command, long initialDelay, long delay, TimeUnit unit) { this.ses = ses; this.initialDelay = initialDelay; this.delay = delay; this.unit = unit; this.command = command; this.infinite = true; } public RepeatedScheduled(ScheduledExecutorService ses, Runnable command, long initialDelay, long delay, TimeUnit unit, int maxExecutions) { this(ses, command, initialDelay, delay, unit); this.repeatCounter = maxExecutions; this.infinite = false; } public Future<?> submit() { // We submit this, not the received command this.control = this.ses.scheduleWithFixedDelay(this, this.initialDelay, this.delay, this.unit); return this.control; } @Override public synchronized void run() { if ( !this.infinite ) { if ( this.repeatCounter > 0 ) { this.command.run(); this.repeatCounter--; } else { this.control.cancel(false); } } else { this.command.run(); } } } 方法返回的Future中的所有内容。

用法:

submit()

1
投票

对于像轮询到某个超时之类的用例,我们可以使用Runnable MyRunnable = ...; // Repeat 20 times RepeatedScheduled rs = new RepeatedScheduled( MySes, MyRunnable, 33, 44, TimeUnit.SECONDS, 20); Future<?> MyControl = rs.submit(); ... 更简单的解决方案。

Future.get()

0
投票

我一直在寻找完全相同的功能,并选择了/* Define task */ public class Poll implements Runnable { @Override public void run() { // Polling logic } } /* Create executor service */ ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); /* Schedule task - poll every 500ms */ ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new Poll(), 0, 500, TimeUnit.MILLISECONDS); /* Wait till 60 sec timeout */ try { future.get(60, TimeUnit.SECONDS); } catch (TimeoutException e) { scheduledFuture.cancel(false); // Take action on timeout }

下面是完整测试工作示例(如果代码中存在太多洪水,则抱歉)applicationContext.xml

org.springframework.scheduling.Trigger

JAVA

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:task="http://www.springframework.org/schema/task"
 xmlns:util="http://www.springframework.org/schema/util"
 xmlns:context="http://www.springframework.org/schema/context"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context/ http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util/ http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <bean id="blockingTasksScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
        <property name="poolSize" value="10" />
    </bean>

    <task:scheduler id="deftaskScheduler" pool-size="10" />

</beans>
© www.soinside.com 2019 - 2024. All rights reserved.