The default implementation invokes SchedulerFactory's getScheduler
* method. Can be overridden for custom Scheduler creation.
*/
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
throws SchedulerException {
//...
try {
SchedulerRepository repository = SchedulerRepository.getInstance();
synchronized (repository) {
Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
Scheduler newScheduler = schedulerFactory.getScheduler();
if (newScheduler == existingScheduler) {
throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
"in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
}
//...
}
复制代码
这个项目走的逻辑是 StdSchedulerFactory.getScheduler()方法,可自行debug。
StdSchedulerFactory.java
复制代码
/**
* Returns a handle to the Scheduler produced by this factory.
* 返回该工厂创造的调度器的句柄
*/
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
initialize();
}
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
//...
sched = instantiate();
return sched;
}
复制代码
StdSchedulerFactory.java
复制代码
private Scheduler instantiate() throws SchedulerException {
//...
//大量的配置初始化、实例化代码
//...
//第1298行代码
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
//...
}
复制代码
QuartzScheduler.java
复制代码
/**
* Create a QuartzScheduler with the given configuration
* 根据给定的配置 创建Quartz调度器
*/
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
//private QuartzSchedulerThread schedThread;
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
//通过线程池执行 Quartz调度器线程
schedThreadExecutor.execute(this.schedThread);
//...
}
复制代码
QuartzSchedulerThread.java
复制代码
/**
*
* The main processing loop of the QuartzSchedulerThread.
* Quartz调度器线程的主循环逻辑
*
executeInternal afterwards.
* 这个实现 把传入的map数据作为bean属性值,然后委托给 executeInternal 方法
*/
public final void execute(JobExecutionContext context) throws JobExecutionException {
try {
//执行
executeInternal(context);
}
复制代码
SyncJobFactory.java
复制代码
//回到了我们的业务类SyncJobFactory的executeInternal方法,
//里面执行我们的业务代码
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
try {
LOG.info("SyncJobFactory execute" + IPAddressKowalski.getIpAddressAndPort() + " port:"+IPAddressKowalski.getTomcatPort());
}
//...
System.out.println("jobName:" + scheduleJob.getJobName() + " " + scheduleJob);
//...
}
复制代码
二、E-R图
梳理6张主要的Quartz表:
QRTZ_TRIGGERS 触发器表
SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_JOB_DETAILS表SCHED_NAME外键
JOB_NAME,任务名。自定义值。 联合主键,QRTZ_JOB_DETAILS表JOB_NAME外键
JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_JOB_DETAILS表JOB_GROUP外键
TRIGGER_STATE,触发器状态: WAITING , ACQUIRED, BLOCKING
NEXT_FIRE_TIME, 下次触发时间:
MISFIRE_INSTR,执行失败后的指令,
非失败策略 MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;
失败策略 MISFIRE_INSTRUCTION_SMART_POLICY = 0;
TRIGGER_TYPE, 触发器类型,例如CRON,cron表达式类型的触发器
PRIORITY,优先级
QRTZ_CRON_TRIGGERS cron类型触发器表
SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_TRIGGERS表SCHED_NAME外键
JOB_NAME,任务名。自定义值。 联合主键,QRTZ_TRIGGERS表JOB_NAME外键
JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_TRIGGERS表JOB_GROUP外键
CRON_EXPRESSION, cron表达式, 例如每30秒执行一次, 0/30 * * * * ?
QRTZ_JOB_DETAILS 任务详细表
SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键
JOB_NAME,任务名。自定义值。 联合主键
JOB_GROUP,任务组。 自定义值。联合主键
JOB_DATA,blob类型,任务参数
QRTZ_FIRED_TRIGGERS 任务触发表
SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键
ENTRY_ID,entry id,联合主键
JOB_NAME,任务名。自定义值。
JOB_GROUP,任务组。 自定义值。
FIRED_TIME, 任务触发时间
STATE,状态
INSTANCE_NAME, 服务器实例名
PRIORITY,优先级
QRTZ_SCHEDULER_STATE
SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键
INSTANCE_NAME,服务器实例名。联合主键
LAST_CHECKIN_TIME,上次检查时间
CHECKIN_INTERVAL,检查间隔
QRTZ_LOCKS 全局锁
SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键
LOCK_NAME,锁名称,例如,TRIGGER_ACCESS。联合主键
三、循环调度逻辑
主要流程如下:
源码如下:
QuartzSchedulerThread.java
复制代码
public void run() {
//...
while (!halted.get()) {
try {
//合理休眠
//...
//获取接下来的触发器
//1.状态为WAITING
//2.触发时间在30秒内
//3.不是错过执行的或者错过了但是时间不超过两分钟
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
//...
//触发任务
List