我对 Quartz 有点陌生。有没有办法更新已提交的 Quartz 作业的作业执行间隔?这个间隔会立即更新吗?改期后还要重新开始工作吗?
我找到了以下链接,但我不知道代码引用了哪些库,因为我的石英罐不包含链接中使用的一些类。另外,triggerKey方法是从哪里来的?这是某种静态导入吗?
http://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/cookbook/UpdateTrigger.html
我想在我的一个 JUnit 测试用例中将作业执行间隔更新为一个非常大的数字,因为我不希望该作业干扰被测类的状态。测试用例完成后,我想将作业执行间隔重置为将在生产中使用的实际值
您必须通过创建新触发器来重新安排作业。
public void execute(JobExecutionContext context) throws JobExecutionException {
Trigger newTrigger = what_ever_you_want;
Trigger oldTrigger = context.getTrigger();
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.rescheduleJob(oldTrigger.getKey(), newTrigger);
}
这将用新的触发器触发时间替换相同的作业。
也许在 Quartz 库的某处有一个静态方法 triggerKey()。但是,我设法重新安排了现有的 Quartz 作业(使用 Quartz 2.3.2)而不使用这种(潜在的)方法,而是使用 TriggerKey 类,如下所示:
boolean updateExisting = true; // try it also with false
int aveId = 1234; // change it as you wish
java.util.Date closeDate = new java.util.Date(); // change it as you wish
SchedulerFactory sf = new StdSchedulerFactory("... /quartz_priority.properties");
Scheduler scheduler = sf.getScheduler();
TriggerKey triggerKey = new TriggerKey("trigger" + aveId, "group1");
if (updateExisting) {
Trigger oldTrigger = scheduler.getTrigger(triggerKey);
TriggerBuilder oldTriggerBuilder = oldTrigger.getTriggerBuilder();
Trigger newTrigger = oldTriggerBuilder.startAt(closeDate).build();
scheduler.rescheduleJob(triggerKey, newTrigger);
} else {
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).startAt(closeDate).build();
// Define job instance
JobDetail job1 = JobBuilder.newJob(<YOUR_JOB_CLASS_NAME>.class).withIdentity("job" + aveId, "group1").build();
JobDataMap map = job1.getJobDataMap();
map.put(<PARAMETER_NAME>, aveId);
// Schedule the job with the trigger
scheduler.scheduleJob(job1, trigger);
}
下面是启动 cron 作业的多实例 Spring Boot 应用程序示例。
作业必须仅在其中一个实例上运行。
每个实例的配置必须相同。
如果作业崩溃,它应该尝试重新启动 3 次,延迟 5 分钟 * 重新启动尝试次数。
如果作业在 3 次重启后仍然崩溃,则应设置作业触发器的默认 cron。
我们将在集群模式下使用 Quartz:
部门:
implementation("org.springframework.boot:spring-boot-starter-quartz")
外出工作:
@Component
@Profile("quartz")
class SomeJob(
private val someService: SomeService
) : QuartzJobBean() {
private val log: Logger = LoggerFactory.getLogger(SomeJob::class.java)
override fun executeInternal(jobExecutionContext: JobExecutionContext) {
try {
log.info("Doing awesome work...")
someService.work()
if ((1..10).random() >= 5) throw RuntimeException("Something went wrong...")
} catch (e: Exception) {
throw JobExecutionException(e)
}
}
}
这是 Quartz 配置(更多信息这里):
@Configuration
@Profile("quartz")
class JobConfig {
//JobDetail for our job
@Bean
fun someJobDetail(): JobDetail {
return JobBuilder
.newJob(SomeJob::class.java).withIdentity("SomeJob")
.withDescription("Some job")
//If we want the job to be launched after the application instance crashes at the
//next launch
.requestRecovery(true)
.storeDurably().build()
}
//Trigger
@Bean
fun someJobTrigger(someJobDetail: JobDetail): Trigger {
return TriggerBuilder.newTrigger().forJob(someJobDetail)
.withIdentity("SomeJobTrigger")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 4 L-1 * ? *"))
.build()
}
//Otherwise, changing cron for an existing trigger will not work. (the old cron value will be stored in the database)
@Bean
fun scheduler(triggers: List<Trigger>, jobDetails: List<JobDetail>, factory: SchedulerFactoryBean): Scheduler {
factory.setWaitForJobsToCompleteOnShutdown(true)
val scheduler = factory.scheduler
factory.setOverwriteExistingJobs(true)
//https://stackoverflow.com/questions/39673572/spring-quartz-scheduler-race-condition
factory.setTransactionManager(JdbcTransactionManager())
rescheduleTriggers(triggers, scheduler)
scheduler.start()
return scheduler
}
private fun rescheduleTriggers(triggers: List<Trigger>, scheduler: Scheduler) {
triggers.forEach {
if (!scheduler.checkExists(it.key)) {
scheduler.scheduleJob(it)
} else {
scheduler.rescheduleJob(it.key, it)
}
}
}
}
为调度器添加一个监听器:
@Component
@Profile("quartz")
class JobListenerConfig(
private val schedulerFactory: SchedulerFactoryBean,
private val jobListener: JobListener
) {
@PostConstruct
fun addListener() {
schedulerFactory.scheduler.listenerManager.addJobListener(jobListener, KeyMatcher.keyEquals(jobKey("SomeJob")))
}
}
现在最重要的 - 处理我们的工作执行与监听器的逻辑:
@Profile("quartz")
class JobListener(
//can be obtained from the execution context, but it can also be injected
private val scheduler: Scheduler,
private val triggers: List<Trigger>
): JobListenerSupport() {
private lateinit var triggerCronMap: Map<String, String>
@PostConstruct
fun post(){
//there will be no recovery triggers , only our self-written ones
triggerCronMap = triggers.associate {
it.key.name to (it as CronTrigger).cronExpression
}
}
override fun getName(): String {
return "myJobListener"
}
override fun jobToBeExecuted(context: JobExecutionContext) {
log.info("Job: ${context.jobDetail.key.name} ready to start by trigger: ${context.trigger.key.name}")
}
override fun jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?) {
//you can use context.mergedJobDataMap
val dataMap = context.trigger.jobDataMap
val count = if (dataMap["count"] != null) dataMap.getIntValue("count") else {
dataMap.putAsString("count", 1)
1
}
//in the if block, you can add the condition && !context.trigger.key.name.startsWith("recover_") - in this case, the scheduler will not restart recover triggers if they fall during execution
if (jobException != null ){
if (count < 3) {
log.warn("Job: ${context.jobDetail.key.name} filed while execution. Restart attempts count: $count ")
val oldTrigger = context.trigger
var newTriggerName = context.trigger.key.name + "_retry"
//in case such a trigger already exists
context.scheduler.getTriggersOfJob(context.jobDetail.key)
.map { it.key.name }
.takeIf { it.contains(newTriggerName) }
?.apply { newTriggerName += "_retry" }
val newTrigger = TriggerBuilder.newTrigger()
.forJob(context.jobDetail)
.withIdentity(newTriggerName, context.trigger.key.group)
//create a simple trigger that should be fired in 5 minutes * restart attempts
.startAt(Date.from(Instant.now().plus((5 * count).toLong(), ChronoUnit.MINUTES)))
.usingJobData("count", count + 1 )
.build()
val date = scheduler.rescheduleJob(oldTrigger.key, newTrigger)
log.info("Rescheduling trigger: ${oldTrigger.key} to trigger: ${newTrigger.key}")
} else {
log.warn("Reach max count of restarts: $count. Remove all triggers for job: ${context.trigger.jobKey.name} and schedule default trigger for it: ${context.trigger.key.name.split("_")[0]}")
recheduleWithDefaultTrigger(context)
}
} else if (count > 1) {
recheduleWithDefaultTrigger(context)
}
else {
log.info("Job: ${context.jobDetail.key.name} completed successfully")
}
context.scheduler.getTriggersOfJob(context.trigger.jobKey).forEach {
log.info("Trigger with key: ${it.key} for job: ${context.trigger.jobKey.name} will start at ${it.nextFireTime ?: it.startTime}")
}
}
private fun recheduleWithDefaultTrigger(context: JobExecutionContext) {
val clone = context.jobDetail.clone() as JobDetail
val defaultTriggerName = context.trigger.key.name.split("_")[0]
//Recovery triggers should not be rescheduled
if (!triggerCronMap.contains(defaultTriggerName)) return
scheduler.deleteJob(clone.key)
scheduler.addJob(clone, true)
scheduler.scheduleJob(
TriggerBuilder.newTrigger()
.forJob(clone)
.withIdentity(defaultTriggerName)
.withSchedule(CronScheduleBuilder.cronSchedule(triggerCronMap[defaultTriggerName]))
.usingJobData("count", 1)
.startAt(Date.from(Instant.now().plusSeconds(5)))
.build()
)
}
}
最后但同样重要的是:application.yaml
spring:
quartz:
job-store-type: jdbc #Database Mode
jdbc:
initialize-schema: never #Do not initialize table structure
properties:
org:
quartz:
scheduler:
instanceId: AUTO #Default hostname and timestamp generate instance ID, which can be any string, but must be the only corresponding qrtz_scheduler_state INSTANCE_NAME field for all dispatchers
#instanceName: clusteredScheduler #quartzScheduler
jobStore:
# a few problems with the two properties below: https://github.com/spring-projects/spring-boot/issues/28758#issuecomment-974628989 & https://github.com/quartz-scheduler/quartz/issues/284
# class: org.springframework.scheduling.quartz.LocalDataSourceJobStore #Persistence Configuration
driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #We only make database-specific proxies for databases
# useProperties: true #Indicates that JDBC JobStore stores all values in JobDataMaps as strings, so more complex objects can be stored as name-value pairs rather than serialized in BLOB columns.In the long run, this is safer because you avoid serializing non-String classes to BLOB class versions.
tablePrefix: scam_quartz.QRTZ_ #Database Table Prefix
misfireThreshold: 60000 #The number of milliseconds the dispatcher will "tolerate" a Trigger to pass its next startup time before being considered a "fire".The default value (if you do not enter this property in the configuration) is 60000 (60 seconds).
clusterCheckinInterval: 5000 #Set the frequency (in milliseconds) of this instance'checkin'* with other instances of the cluster.Affects the speed of detecting failed instances.
isClustered: true #Turn on Clustering
threadPool: #Connection Pool
class: org.quartz.simpl.SimpleThreadPool
threadCount: 3
threadPriority: 1
threadsInheritContextClassLoaderOfInitializingThread: true
这里数据库的官方脚本(使用liquibase或flyway)
更多信息:
关于石英
在集群模式下使用 quartz 的 spring boot
再来一篇
集群有效石英