通过源码分析Java开源任务调度框架Quartz的主要流程

通过源码分析Java开源任务调度框架Quartz的主要流程 从使用效果、调用链路跟踪、E-R图、循环调度逻辑几个方面分析Quartz。 github项目地址: https://github.com/tanliwei/spring-quartz-cluster-sample , 补充了SQL输出 系统说明: IDE: IntelliJ JDK:1.8 Quartz:2.2.1 使用效果 1.从github项目https://github.com/tanliwei/spring-quartz-cluster-sample中,拉取项目到本地,导入IDEA。 相信读者都有一定工作经验,这些细节不赘述。 2.本文采用Mysql数据库。 请执行 resources/scripts/tables_mysql_innodb.sql 3.修改jdbc.properties中数据库配置 4.通过IDEA, Edit Configurations -> Add Tomcat Server, 部署到Tomcat 暴露的Restful 接口 /say-hello.do 以及添加好任务后的调用效果: 添加任务 在tomcat启动成功后,在首页点击“添加任务”,添加如下任务: 代码执行逻辑在SyncJobFactory类中,从Output中可以看到执行的输出信息, 调用链跟踪的最后会回到这个类来。 现在开始跟踪调用链路。 IDEA 快捷键: 进入方法: Ctrl + 鼠标左键 光标前进/后退: Ctrl + Shirt + 右方向键/左方向键 一、 调用链路跟踪 从配置文件applicationContext.xml配置中找到任务调度核心类SchedulerFactoryBean resources/applicationContext.xml 复制代码 ... 复制代码 使用IDEA快捷键,点击进入SchedulerFactoryBean类,它实现了InitializingBean接口, 在Spring中凡是实现了InitializingBean接口的Bean,都会在Bean属性都设置完成后调用afterPropertiesSet()方法. SchedulerFactoryBean.java 复制代码 //--------------------------------------------------------------------- // Implementation of InitializingBean interface // 实现 InitializingBean 接口 //--------------------------------------------------------------------- public void afterPropertiesSet() throws Exception { //... // Create SchedulerFactory instance. // 创建 SchedulerFactory 调度器工厂实例 SchedulerFactory schedulerFactory = (SchedulerFactory) BeanUtils.instantiateClass(this.schedulerFactoryClass); initSchedulerFactory(schedulerFactory); //... // Get Scheduler instance from SchedulerFactory. // 通过调度器工厂 获取 调度器实例 try { this.scheduler = createScheduler(schedulerFactory, this.schedulerName); //... } 复制代码 SchedulerFactoryBean.java 复制代码 /** * Create the Scheduler instance for the given factory and scheduler name. * 通过制定工厂和调度器名称创建调度器实例 * Called by {@link #afterPropertiesSet}. *

The default implementation invokes SchedulerFactory's getScheduler * method. Can be overridden for custom Scheduler creation. */ protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName) throws SchedulerException { //... try { SchedulerRepository repository = SchedulerRepository.getInstance(); synchronized (repository) { Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null); Scheduler newScheduler = schedulerFactory.getScheduler(); if (newScheduler == existingScheduler) { throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " + "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!"); } //... } 复制代码 这个项目走的逻辑是 StdSchedulerFactory.getScheduler()方法,可自行debug。 StdSchedulerFactory.java 复制代码 /** * Returns a handle to the Scheduler produced by this factory. * 返回该工厂创造的调度器的句柄 */ public Scheduler getScheduler() throws SchedulerException { if (cfg == null) { initialize(); } SchedulerRepository schedRep = SchedulerRepository.getInstance(); Scheduler sched = schedRep.lookup(getSchedulerName()); //... sched = instantiate(); return sched; } 复制代码 StdSchedulerFactory.java 复制代码 private Scheduler instantiate() throws SchedulerException { //... //大量的配置初始化、实例化代码 //... //第1298行代码 qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry); //... } 复制代码 QuartzScheduler.java 复制代码 /** * Create a QuartzScheduler with the given configuration * 根据给定的配置 创建Quartz调度器 */ public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval) throws SchedulerException { this.resources = resources; if (resources.getJobStore() instanceof JobListener) { addInternalJobListener((JobListener)resources.getJobStore()); } //private QuartzSchedulerThread schedThread; this.schedThread = new QuartzSchedulerThread(this, resources); ThreadExecutor schedThreadExecutor = resources.getThreadExecutor(); //通过线程池执行 Quartz调度器线程 schedThreadExecutor.execute(this.schedThread); //... } 复制代码 QuartzSchedulerThread.java 复制代码 /** *

* The main processing loop of the QuartzSchedulerThread. * Quartz调度器线程的主循环逻辑 *

*/ @Override public void run() { //while循环执行,只要调度器为被暂停 while(!halted.get()){ JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } if (qsRsrcs.getThreadPool().runInThread(shell) == false){} } } 复制代码 JobRunShell.java 复制代码 public void run() { //... Job job = jec.getJobInstance(); //... try { log.debug("Calling execute on job " + jobDetail.getKey()); //执行 job.execute(jec); endTime = System.currentTimeMillis(); } //... //更新Trigger触发器状态,删除FIRED_TRIGGERS触发记录 instCode = trigger.executionComplete(jec, jobExEx); //... } 复制代码 QuartzJobBean.java 复制代码 /** * This implementation applies the passed-in job data map as bean property * values, and delegates to executeInternal afterwards. * 这个实现 把传入的map数据作为bean属性值,然后委托给 executeInternal 方法 */ public final void execute(JobExecutionContext context) throws JobExecutionException { try { //执行 executeInternal(context); } 复制代码 SyncJobFactory.java 复制代码 //回到了我们的业务类SyncJobFactory的executeInternal方法, //里面执行我们的业务代码 protected void executeInternal(JobExecutionContext context) throws JobExecutionException { try { LOG.info("SyncJobFactory execute" + IPAddressKowalski.getIpAddressAndPort() + " port:"+IPAddressKowalski.getTomcatPort()); } //... System.out.println("jobName:" + scheduleJob.getJobName() + " " + scheduleJob); //... } 复制代码 二、E-R图 梳理6张主要的Quartz表: QRTZ_TRIGGERS 触发器表 SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_JOB_DETAILS表SCHED_NAME外键 JOB_NAME,任务名。自定义值。 联合主键,QRTZ_JOB_DETAILS表JOB_NAME外键 JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_JOB_DETAILS表JOB_GROUP外键 TRIGGER_STATE,触发器状态: WAITING , ACQUIRED, BLOCKING NEXT_FIRE_TIME, 下次触发时间: MISFIRE_INSTR,执行失败后的指令, 非失败策略 MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1; 失败策略 MISFIRE_INSTRUCTION_SMART_POLICY = 0; TRIGGER_TYPE, 触发器类型,例如CRON,cron表达式类型的触发器 PRIORITY,优先级 QRTZ_CRON_TRIGGERS cron类型触发器表 SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_TRIGGERS表SCHED_NAME外键 JOB_NAME,任务名。自定义值。 联合主键,QRTZ_TRIGGERS表JOB_NAME外键 JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_TRIGGERS表JOB_GROUP外键 CRON_EXPRESSION, cron表达式, 例如每30秒执行一次, 0/30 * * * * ? QRTZ_JOB_DETAILS 任务详细表 SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键 JOB_NAME,任务名。自定义值。 联合主键 JOB_GROUP,任务组。 自定义值。联合主键 JOB_DATA,blob类型,任务参数 QRTZ_FIRED_TRIGGERS 任务触发表 SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键 ENTRY_ID,entry id,联合主键 JOB_NAME,任务名。自定义值。 JOB_GROUP,任务组。 自定义值。 FIRED_TIME, 任务触发时间 STATE,状态 INSTANCE_NAME, 服务器实例名 PRIORITY,优先级 QRTZ_SCHEDULER_STATE SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键 INSTANCE_NAME,服务器实例名。联合主键 LAST_CHECKIN_TIME,上次检查时间 CHECKIN_INTERVAL,检查间隔 QRTZ_LOCKS 全局锁 SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键 LOCK_NAME,锁名称,例如,TRIGGER_ACCESS。联合主键 三、循环调度逻辑 主要流程如下: 源码如下: QuartzSchedulerThread.java 复制代码 public void run() { //... while (!halted.get()) { try { //合理休眠 //... //获取接下来的触发器 //1.状态为WAITING //2.触发时间在30秒内 //3.不是错过执行的或者错过了但是时间不超过两分钟 triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); //... //触发任务 List res = qsRsrcs.getJobStore().triggersFired(triggers); //... JobRunShell shell = null; //... //执行代码 if (qsRsrcs.getThreadPool().runInThread(shell) == false) { //... } // while (!halted) //.. } 复制代码 JobRunShell.java 复制代码 protected QuartzScheduler qs = null; public void run() { qs.addInternalSchedulerListener(this); try { //... do { Job job = jec.getJobInstance(); // execute the job try { //执行任务代码 job.execute(jec); //更新触发器,删除触发记录 qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode); break; } while (true); } //... } 复制代码 四、扩展 除了对主线程 QuartzSchedulerThread 的分析 继续分析JobStoreSupport类的两个线程 ClusterManager 和 MisfireHandler 的分析, 它们维护触发器的MISFIRE_INSTR状态,和调度器状态https://www.cnblogs.com/tanliwei/p/10020787.html
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信