分析Java延迟与周期任务的实现原理
Java并发编程源码分析系列:
ScheduledFutureTask继承了我们熟悉的FutureTask,这个不用多说。图1是它实现的接口,比较陌生的是Delayed,而Delayed又继承了Comparable。
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
添加延迟任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }
Runnable和Callable包装成ScheduledFutureTask实例,保存了延迟信息,然后执行delayedExecute。
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } } boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
如果线程池已经关闭,直接调用饱和策略,否则将任务加入等待队列。加入之后,需要再判断线程池的状态,和当前任务是否能运行。如果不能继续执行,将任务移出队列并取消任务。
canRunInCurrentRunState处理任务加入等待队列后,又未执行就发生线程池关闭的情况,它通过预设的两个变量判断任务到底能不能执行。
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
最后调用到ensurePrestart,使用addWorkder增加工作线程,这在ThreadPoolExecutor解释过了
添加周期任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,long period,TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay, long delay,TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
两个方法几乎一样,不同的是构建ScheduledFutureTask时,period一个传正数,另一个传负数。不用怀疑,区分两种情况就是用正负。
等待队列
线程池的等待队列使用了内部类DelayedWorkQueue,和普通线程池等待队列最大的不同是它的任务是按照目标执行时间进行排序。
入队的offer被重写了,add和put方法也是调用offer,具体BlockingQueue的实现逻辑不在这里讨论,重点是看offer里的siftUp方法。
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
siftUp根据任务的compareTo,将任务移动到队列中指定的位置,就是这样。
对应地,出队take方法,根据任务的delay时间,小于等于0时将任务出队,否则等待。
任务执行
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
计算下次执行时间,period根据正负有不同的计算逻辑,负的时间也会先改正,很明显对应上文的scheduleAtFixedRate和scheduleWithFixedDelay两个方法。
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
Timer的缺陷
自从知道ScheduledThreadPoolExecutor,再没有使用Timer,因为它有几个缺陷:
private final TaskQueue queue = new TaskQueue(); private final TimerThread thread = new TimerThread(queue);
Timer的代码很简单,主要数据结构是一个任务队列和一个执行线程。新增的任务会加入任务队列,到达时间后,由执行线程执行。只有一个线程,很容易理解上面讲的缺陷。
ScheduledThreadPoolExecutor每个任务都有对应的执行线程,时间使用相对时间计算,也就没有上面的缺陷,所以没有理由使用Timer了。