分析Java延迟与周期任务的实现原理

Java并发编程源码分析系列:

ScheduledFutureTask继承了我们熟悉的FutureTask,这个不用多说。图1是它实现的接口,比较陌生的是Delayed,而Delayed又继承了Comparable。

public long getDelay(TimeUnit unit) {         return unit.convert(time - now(), NANOSECONDS); }
public int compareTo(Delayed other) {     if (other == this) // compare zero if same object         return 0;     if (other instanceof ScheduledFutureTask) {         ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;         long diff = time - x.time;         if (diff < 0)             return -1;         else if (diff > 0)             return 1;         else if (sequenceNumber < x.sequenceNumber)             return -1;         else             return 1;     }     long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }

这两个接口的存在很容易理解,ScheduledFutureTask在等待队列里调度不再按照FIFO,而是按照执行时间,谁即将执行,谁就排在前面。在这里也可以看到sequenceNumber的作用,当执行时间相同时,按照序号排序。

添加延迟任务

对ScheduledThreadPoolExecutor使用通用的execute或者submit提交任务,最终调用schedule方法,默认马上执行。如果需要延迟执行,需要直接使用schedule,传递时间参数。

public <V> ScheduledFuture<V> schedule(Callable<V> callable,                                        long delay,                                        TimeUnit unit) {     if (callable == null || unit == null)         throw new NullPointerException();     RunnableScheduledFuture<V> t = decorateTask(callable,         new ScheduledFutureTask<V>(callable,                                    triggerTime(delay, unit)));     delayedExecute(t);     return t; }

Runnable和Callable包装成ScheduledFutureTask实例,保存了延迟信息,然后执行delayedExecute。

private void delayedExecute(RunnableScheduledFuture<?> task) {     if (isShutdown())         reject(task);     else {         super.getQueue().add(task);         if (isShutdown() &&             !canRunInCurrentRunState(task.isPeriodic()) &&             remove(task))             task.cancel(false);         else             ensurePrestart();     } }  boolean canRunInCurrentRunState(boolean periodic) {     return isRunningOrShutdown(periodic ?                                continueExistingPeriodicTasksAfterShutdown :                                executeExistingDelayedTasksAfterShutdown); }

如果线程池已经关闭,直接调用饱和策略,否则将任务加入等待队列。加入之后,需要再判断线程池的状态,和当前任务是否能运行。如果不能继续执行,将任务移出队列并取消任务。

canRunInCurrentRunState处理任务加入等待队列后,又未执行就发生线程池关闭的情况,它通过预设的两个变量判断任务到底能不能执行。

void ensurePrestart() {     int wc = workerCountOf(ctl.get());     if (wc < corePoolSize)         addWorker(null, true);     else if (wc == 0)         addWorker(null, false); }

最后调用到ensurePrestart,使用addWorkder增加工作线程,这在ThreadPoolExecutor解释过了

添加周期任务

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                              long initialDelay,long period,TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    if (period <= 0)        throw new IllegalArgumentException();    ScheduledFutureTask<Void> sft =        new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));    RunnableScheduledFuture<Void> t = decorateTask(command, sft);    sft.outerTask = t;    delayedExecute(t);    return t; }  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,                                                 long delay,TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    if (delay <= 0)        throw new IllegalArgumentException();    ScheduledFutureTask<Void> sft =        new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),                                      unit.toNanos(-delay));    RunnableScheduledFuture<Void> t = decorateTask(command, sft);    sft.outerTask = t;    delayedExecute(t);    return t; }

执行周期任务有上面两个方法,具体作用方法名写得很清楚:

两个方法几乎一样,不同的是构建ScheduledFutureTask时,period一个传正数,另一个传负数。不用怀疑,区分两种情况就是用正负。

等待队列

线程池的等待队列使用了内部类DelayedWorkQueue,和普通线程池等待队列最大的不同是它的任务是按照目标执行时间进行排序。

入队的offer被重写了,add和put方法也是调用offer,具体BlockingQueue的实现逻辑不在这里讨论,重点是看offer里的siftUp方法。

private void siftUp(int k, RunnableScheduledFuture<?> key) {     while (k > 0) {         int parent = (k - 1) >>> 1;         RunnableScheduledFuture<?> e = queue[parent];         if (key.compareTo(e) >= 0)             break;         queue[k] = e;         setIndex(e, k);         k = parent;     }     queue[k] = key;     setIndex(key, k); }

siftUp根据任务的compareTo,将任务移动到队列中指定的位置,就是这样。

对应地,出队take方法,根据任务的delay时间,小于等于0时将任务出队,否则等待。

任务执行

当线程池从等待队列取出一个任务时,会执行它的run方法。

public void run() {     boolean periodic = isPeriodic();     if (!canRunInCurrentRunState(periodic))         cancel(false);     else if (!periodic)         ScheduledFutureTask.super.run();     else if (ScheduledFutureTask.super.runAndReset()) {         setNextRunTime();         reExecutePeriodic(outerTask);     } }

方法有三个分支,第一个if判断任务在当前线程池状态下是否能执行,canRunInCurrentRunState已经讲解过。第二个if是判断是否周期任务,不是的话直接执行,不需要多余的操作。重点来看第三个if,也就是周期执行任务。

  1. runAndReset:任务执行完重置为初始状态,等待下一次执行;
  2. setNextRunTime:计算下次执行时间;
  3. reExecutePeriodic:再调度任务。
private void setNextRunTime() {     long p = period;     if (p > 0)         time += p;     else         time = triggerTime(-p); }

计算下次执行时间,period根据正负有不同的计算逻辑,负的时间也会先改正,很明显对应上文的scheduleAtFixedRate和scheduleWithFixedDelay两个方法。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {     if (canRunInCurrentRunState(true)) {         super.getQueue().add(task);         if (!canRunInCurrentRunState(true) && remove(task))             task.cancel(false);         else             ensurePrestart();     } }

将任务重新加入等待队列,中间几个方法都解释过了。

Timer的缺陷

自从知道ScheduledThreadPoolExecutor,再没有使用Timer,因为它有几个缺陷:

private final TaskQueue queue = new TaskQueue(); private final TimerThread thread = new TimerThread(queue);

Timer的代码很简单,主要数据结构是一个任务队列和一个执行线程。新增的任务会加入任务队列,到达时间后,由执行线程执行。只有一个线程,很容易理解上面讲的缺陷。

ScheduledThreadPoolExecutor每个任务都有对应的执行线程,时间使用相对时间计算,也就没有上面的缺陷,所以没有理由使用Timer了。

50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信