如何在 Quartz 中重新安排作业执行间隔?

问题描述 投票:0回答:3

我对 Quartz 有点陌生。有没有办法更新已提交的 Quartz 作业的作业执行间隔?这个间隔会立即更新吗?改期后还要重新开始工作吗?

我找到了以下链接,但我不知道代码引用了哪些库,因为我的石英罐不包含链接中使用的一些类。另外,triggerKey方法是从哪里来的?这是某种静态导入吗?

http://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/cookbook/UpdateTrigger.html

我想在我的一个 JUnit 测试用例中将作业执行间隔更新为一个非常大的数字,因为我不希望该作业干扰被测类的状态。测试用例完成后,我想将作业执行间隔重置为将在生产中使用的实际值

java quartz-scheduler
3个回答
22
投票

您必须通过创建新触发器来重新安排作业。

public void execute(JobExecutionContext context) throws JobExecutionException {
    Trigger newTrigger = what_ever_you_want;
    Trigger oldTrigger = context.getTrigger();
    Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
    scheduler.rescheduleJob(oldTrigger.getKey(), newTrigger);
}

这将用新的触发器触发时间替换相同的作业。


0
投票

也许在 Quartz 库的某处有一个静态方法 triggerKey()。但是,我设法重新安排了现有的 Quartz 作业(使用 Quartz 2.3.2)而不使用这种(潜在的)方法,而是使用 TriggerKey 类,如下所示:

    boolean updateExisting = true; // try it also with false
    int aveId = 1234; // change it as you wish
    java.util.Date closeDate = new java.util.Date(); // change it as you wish

    SchedulerFactory sf = new StdSchedulerFactory("... /quartz_priority.properties");
    Scheduler scheduler = sf.getScheduler();

    TriggerKey triggerKey = new TriggerKey("trigger" + aveId, "group1");

    if (updateExisting) {
        Trigger oldTrigger = scheduler.getTrigger(triggerKey);
        TriggerBuilder oldTriggerBuilder = oldTrigger.getTriggerBuilder();

        Trigger newTrigger = oldTriggerBuilder.startAt(closeDate).build();
        scheduler.rescheduleJob(triggerKey, newTrigger);

    } else {
        Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).startAt(closeDate).build();

        // Define job instance
        JobDetail job1 = JobBuilder.newJob(<YOUR_JOB_CLASS_NAME>.class).withIdentity("job" + aveId, "group1").build();

        JobDataMap map = job1.getJobDataMap();
        map.put(<PARAMETER_NAME>, aveId);

        // Schedule the job with the trigger
        scheduler.scheduleJob(job1, trigger);
    }

0
投票

让我扩展一下 KrisAlex Mi 的答案。

下面是启动 cron 作业的多实例 Spring Boot 应用程序示例。
作业必须仅在其中一个实例上运行。
每个实例的配置必须相同。
如果作业崩溃,它应该尝试重新启动 3 次,延迟 5 分钟 * 重新启动尝试次数。
如果作业在 3 次重启后仍然崩溃,则应设置作业触发器的默认 cron。

我们将在集群模式下使用 Quartz:

部门:

implementation("org.springframework.boot:spring-boot-starter-quartz")

外出工作:

@Component
@Profile("quartz")
class SomeJob(
    private val someService: SomeService
) : QuartzJobBean() {
    private val log: Logger = LoggerFactory.getLogger(SomeJob::class.java)
    
    override fun executeInternal(jobExecutionContext: JobExecutionContext) {
        try {
            log.info("Doing awesome work...")
            someService.work()
            if ((1..10).random() >= 5) throw RuntimeException("Something went wrong...")
        } catch (e: Exception) {
            throw JobExecutionException(e)
        }
    }
}

这是 Quartz 配置(更多信息这里):

@Configuration
@Profile("quartz")
class JobConfig {
    //JobDetail for our job
    @Bean
    fun someJobDetail(): JobDetail {
        return JobBuilder
            .newJob(SomeJob::class.java).withIdentity("SomeJob")
            .withDescription("Some job")
            //If we want the job to be launched after the application instance crashes at the 
            //next launch
            .requestRecovery(true)
            .storeDurably().build()
    }

    //Trigger
    @Bean
    fun someJobTrigger(someJobDetail: JobDetail): Trigger {
        return TriggerBuilder.newTrigger().forJob(someJobDetail)
            .withIdentity("SomeJobTrigger")
            .withSchedule(CronScheduleBuilder.cronSchedule("0 0 4 L-1 * ? *"))
            .build()

    }

    //Otherwise, changing cron for an existing trigger will not work. (the old cron value will be stored in the database)
    @Bean
    fun scheduler(triggers: List<Trigger>, jobDetails: List<JobDetail>, factory: SchedulerFactoryBean): Scheduler {
        factory.setWaitForJobsToCompleteOnShutdown(true)
        val scheduler = factory.scheduler
        factory.setOverwriteExistingJobs(true)
        //https://stackoverflow.com/questions/39673572/spring-quartz-scheduler-race-condition
        factory.setTransactionManager(JdbcTransactionManager())
        rescheduleTriggers(triggers, scheduler)
        scheduler.start()
        return scheduler
    }

    private fun rescheduleTriggers(triggers: List<Trigger>, scheduler: Scheduler) {
        triggers.forEach {
            if (!scheduler.checkExists(it.key)) {
                scheduler.scheduleJob(it)
            } else {
                scheduler.rescheduleJob(it.key, it)
            }
        }
    }
}
    

为调度器添加一个监听器:

@Component
@Profile("quartz")
class JobListenerConfig(
    private val schedulerFactory: SchedulerFactoryBean,
    private val jobListener: JobListener
) {
    @PostConstruct
    fun addListener() {
        schedulerFactory.scheduler.listenerManager.addJobListener(jobListener, KeyMatcher.keyEquals(jobKey("SomeJob")))
    }
}

现在最重要的 - 处理我们的工作执行与监听器的逻辑:

@Profile("quartz")
class JobListener(
    //can be obtained from the execution context, but it can also be injected
    private val scheduler: Scheduler,
    private val triggers: List<Trigger>
): JobListenerSupport() {

    private lateinit var triggerCronMap: Map<String, String>

    @PostConstruct
    fun post(){
        //there will be no recovery triggers , only our self-written ones
        triggerCronMap = triggers.associate {
            it.key.name to (it as CronTrigger).cronExpression
        }
    }

    override fun getName(): String {
        return "myJobListener"
    }


    override fun jobToBeExecuted(context: JobExecutionContext) {
        log.info("Job: ${context.jobDetail.key.name} ready to start by trigger: ${context.trigger.key.name}")
    }


    override fun jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?) {
        //you can use context.mergedJobDataMap
        val dataMap = context.trigger.jobDataMap
        val count = if (dataMap["count"] != null) dataMap.getIntValue("count") else {
            dataMap.putAsString("count", 1)
            1
        }
        //in the if block, you can add the condition && !context.trigger.key.name.startsWith("recover_") - in this case, the scheduler will not restart recover triggers if they fall during execution
        if (jobException != null ){
            if (count < 3) {
                log.warn("Job: ${context.jobDetail.key.name} filed while execution. Restart attempts count: $count ")
                val oldTrigger = context.trigger
                var newTriggerName = context.trigger.key.name + "_retry"
                //in case such a trigger already exists
                context.scheduler.getTriggersOfJob(context.jobDetail.key)
                    .map { it.key.name }
                    .takeIf { it.contains(newTriggerName) }
                    ?.apply { newTriggerName += "_retry" }
                val newTrigger = TriggerBuilder.newTrigger()
                    .forJob(context.jobDetail)
                    .withIdentity(newTriggerName, context.trigger.key.group)
                    //create a simple trigger that should be fired in 5 minutes * restart attempts
                    .startAt(Date.from(Instant.now().plus((5 * count).toLong(), ChronoUnit.MINUTES)))
                    .usingJobData("count", count + 1 )
                    .build()
                val date = scheduler.rescheduleJob(oldTrigger.key, newTrigger)
                log.info("Rescheduling trigger: ${oldTrigger.key} to trigger: ${newTrigger.key}")
            } else {
                log.warn("Reach max count of restarts: $count. Remove all triggers for job: ${context.trigger.jobKey.name} and schedule default trigger for it: ${context.trigger.key.name.split("_")[0]}")
                recheduleWithDefaultTrigger(context)
            }
        } else if (count > 1) {
            recheduleWithDefaultTrigger(context)
        }
        else {
            log.info("Job: ${context.jobDetail.key.name} completed successfully")
        }
        context.scheduler.getTriggersOfJob(context.trigger.jobKey).forEach {
            log.info("Trigger with key: ${it.key} for job: ${context.trigger.jobKey.name} will start at ${it.nextFireTime ?: it.startTime}")
        }
    }

    private fun recheduleWithDefaultTrigger(context: JobExecutionContext) {
        val clone = context.jobDetail.clone() as JobDetail
        val defaultTriggerName = context.trigger.key.name.split("_")[0]
        //Recovery triggers should not be rescheduled
        if (!triggerCronMap.contains(defaultTriggerName)) return
        scheduler.deleteJob(clone.key)
        scheduler.addJob(clone, true)
        scheduler.scheduleJob(
            TriggerBuilder.newTrigger()
                .forJob(clone)
                .withIdentity(defaultTriggerName)
                .withSchedule(CronScheduleBuilder.cronSchedule(triggerCronMap[defaultTriggerName]))
                .usingJobData("count", 1)
                .startAt(Date.from(Instant.now().plusSeconds(5)))
                .build()
        )
    }
}

最后但同样重要的是:application.yaml

spring:
  quartz:
    job-store-type: jdbc #Database Mode
    jdbc:
      initialize-schema: never #Do not initialize table structure
    properties:
      org:
        quartz:
          scheduler:
            instanceId: AUTO #Default hostname and timestamp generate instance ID, which can be any string, but must be the only corresponding qrtz_scheduler_state INSTANCE_NAME field for all dispatchers
            #instanceName: clusteredScheduler #quartzScheduler
          jobStore:
#            a few problems with the two properties below: https://github.com/spring-projects/spring-boot/issues/28758#issuecomment-974628989 & https://github.com/quartz-scheduler/quartz/issues/284
#            class: org.springframework.scheduling.quartz.LocalDataSourceJobStore #Persistence Configuration
            driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #We only make database-specific proxies for databases
#            useProperties: true #Indicates that JDBC JobStore stores all values in JobDataMaps as strings, so more complex objects can be stored as name-value pairs rather than serialized in BLOB columns.In the long run, this is safer because you avoid serializing non-String classes to BLOB class versions.
            tablePrefix: scam_quartz.QRTZ_  #Database Table Prefix
            misfireThreshold: 60000 #The number of milliseconds the dispatcher will "tolerate" a Trigger to pass its next startup time before being considered a "fire".The default value (if you do not enter this property in the configuration) is 60000 (60 seconds).
            clusterCheckinInterval: 5000 #Set the frequency (in milliseconds) of this instance'checkin'* with other instances of the cluster.Affects the speed of detecting failed instances.
            isClustered: true #Turn on Clustering
          threadPool: #Connection Pool
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 3
            threadPriority: 1
            threadsInheritContextClassLoaderOfInitializingThread: true

这里数据库的官方脚本(使用liquibase或flyway)
更多信息:
关于石英
在集群模式下使用 quartz 的 spring boot
再来一篇
集群有效石英

© www.soinside.com 2019 - 2024. All rights reserved.