(手機橫屏看源碼更方便)
注:java源碼分析部分如無特殊說明均基于 java8 版本。
注:本文基于ScheduledThreadPoolExecutor定時線程池類。
前面我們一起學(xué)習(xí)了普通任務(wù)、未來任務(wù)的執(zhí)行流程,今天我們再來學(xué)習(xí)一種新的任務(wù)——定時任務(wù)。
定時任務(wù)是我們經(jīng)常會用到的一種任務(wù),它表示在未來某個時刻執(zhí)行,或者未來按照某種規(guī)則重復(fù)執(zhí)行的任務(wù)。
(1)如何保證任務(wù)是在未來某個時刻才被執(zhí)行?
(2)如何保證任務(wù)按照某種規(guī)則重復(fù)執(zhí)行?
創(chuàng)建一個定時線程池,用它來跑四種不同的定時任務(wù)。
public class ThreadPoolTest03 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 創(chuàng)建一個定時線程池
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5);
System.out.println("start: " + System.currentTimeMillis());
// 執(zhí)行一個無返回值任務(wù),5秒后執(zhí)行,只執(zhí)行一次
scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("spring: " + System.currentTimeMillis());
}, 5, TimeUnit.SECONDS);
// 執(zhí)行一個有返回值任務(wù),5秒后執(zhí)行,只執(zhí)行一次
ScheduledFuture<String> future = scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("inner summer: " + System.currentTimeMillis());
return "outer summer: ";
}, 5, TimeUnit.SECONDS);
// 獲取返回值
System.out.println(future.get() + System.currentTimeMillis());
// 按固定頻率執(zhí)行一個任務(wù),每2秒執(zhí)行一次,1秒后執(zhí)行
// 任務(wù)開始時的2秒后
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println("autumn: " + System.currentTimeMillis());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}, 1, 2, TimeUnit.SECONDS);
// 按固定延時執(zhí)行一個任務(wù),每延時2秒執(zhí)行一次,1秒執(zhí)行
// 任務(wù)結(jié)束時的2秒后,本文由公從號“彤哥讀源碼”原創(chuàng)
scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
System.out.println("winter: " + System.currentTimeMillis());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}, 1, 2, TimeUnit.SECONDS);
}
}
定時任務(wù)總體分為四種:
(1)未來執(zhí)行一次的任務(wù),無返回值;
(2)未來執(zhí)行一次的任務(wù),有返回值;
(3)未來按固定頻率重復(fù)執(zhí)行的任務(wù);
(4)未來按固定延時重復(fù)執(zhí)行的任務(wù);
本文主要以第三種為例進行源碼解析。
提交一個按固定頻率執(zhí)行的任務(wù)。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
// 參數(shù)判斷
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 將普通任務(wù)裝飾成ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 鉤子方法,給子類用來替換裝飾task,這里認(rèn)為t==sft
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 延時執(zhí)行
delayedExecute(t);
return t;
}
可以看到,這里的處理跟未來任務(wù)類似,都是裝飾成另一個任務(wù),再拿去執(zhí)行,不同的是這里交給了delayedExecute()方法去執(zhí)行,這個方法是干嘛的呢?
延時執(zhí)行。
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果線程池關(guān)閉了,執(zhí)行拒絕策略
if (isShutdown())
reject(task);
else {
// 先把任務(wù)扔到隊列中去
super.getQueue().add(task);
// 再次檢查線程池狀態(tài)
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 保證有足夠有線程執(zhí)行任務(wù)
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 創(chuàng)建工作線程
// 注意,這里沒有傳入firstTask參數(shù),因為上面先把任務(wù)扔到隊列中去了
// 另外,沒用上maxPoolSize參數(shù),所以大線程數(shù)量在定時線程池中實際是沒有用的
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
到這里就結(jié)束了?!
實際上,這里只是控制任務(wù)能不能被執(zhí)行,真正執(zhí)行任務(wù)的地方在任務(wù)的run()方法中。
還記得上面的任務(wù)被裝飾成了ScheduledFutureTask類的實例嗎?所以,我們只要看ScheduledFutureTask的run()方法就可以了。
定時任務(wù)執(zhí)行的地方。
public void run() {
// 是否重復(fù)執(zhí)行
boolean periodic = isPeriodic();
// 線程池狀態(tài)判斷
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 一次性任務(wù),直接調(diào)用父類的run()方法,這個父類實際上是FutureTask
// 這里我們不再講解,有興趣的同學(xué)看看上一章的內(nèi)容
else if (!periodic)
ScheduledFutureTask.super.run();
// 重復(fù)性任務(wù),先調(diào)用父類的runAndReset()方法,這個父類也是FutureTask
// 本文主要分析下面的部分
else if (ScheduledFutureTask.super.runAndReset()) {
// 設(shè)置下次執(zhí)行的時間
setNextRunTime();
// 重復(fù)執(zhí)行,本文由公從號“彤哥讀源碼”原創(chuàng)
reExecutePeriodic(outerTask);
}
}
可以看到,對于重復(fù)性任務(wù),先調(diào)用FutureTask的runAndReset()方法,再設(shè)置下次執(zhí)行的時間,最后再調(diào)用reExecutePeriodic()方法。
FutureTask的runAndReset()方法與run()方法類似,只是其任務(wù)運行完畢后不會把狀態(tài)修改為NORMAL,有興趣的同學(xué)點進源碼看看。
再來看看reExecutePeriodic()方法。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 線程池狀態(tài)檢查
if (canRunInCurrentRunState(true)) {
// 再次把任務(wù)扔到任務(wù)隊列中
super.getQueue().add(task);
// 再次檢查線程池狀態(tài)
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 保證工作線程足夠
ensurePrestart();
}
}
到這里是不是豁然開朗了,原來定時線程池執(zhí)行重復(fù)任務(wù)是在任務(wù)執(zhí)行完畢后,又把任務(wù)扔回了任務(wù)隊列中。
重復(fù)性的問題解決了,那么,它是怎么控制任務(wù)在某個時刻執(zhí)行的呢?
OK,這就輪到我們的延時隊列登場了。
我們知道,線程池執(zhí)行任務(wù)時需要從任務(wù)隊列中拿任務(wù),而普通的任務(wù)隊列,如果里面有任務(wù)就直接拿出來了,但是延時隊列不一樣,它里面的任務(wù),如果沒有到時間也是拿不出來的,這也是前面分析中一上來就把任務(wù)扔進隊列且創(chuàng)建Worker沒有傳入firstTask的原因。
說了這么多,它到底是怎么實現(xiàn)的呢?
其實,延時隊列我們在前面都詳細(xì)分析過,想看完整源碼分析的可以看看之前的《死磕 java集合之DelayQueue源碼分析》。
延時隊列內(nèi)部是使用“堆”這種數(shù)據(jù)結(jié)構(gòu)來實現(xiàn)的,有興趣的同學(xué)可以看看之前的《拜托,面試別再問我堆(排序)了!》。
我們這里只拿一個take()方法出來分析。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lockInterruptibly();
try {
for (;;) {
// 堆頂任務(wù)
RunnableScheduledFuture<?> first = queue[0];
// 如果隊列為空,則等待
if (first == null)
available.await();
else {
// 還有多久到時間
long delay = first.getDelay(NANOSECONDS);
// 如果小于等于0,說明這個任務(wù)到時間了,可以從隊列中出隊了
if (delay <= 0)
// 出隊,然后堆化
return finishPoll(first);
// 還沒到時間
first = null;
// 如果前面有線程在等待,直接進入等待
if (leader != null)
available.await();
else {
// 當(dāng)前線程作為leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待上面計算的延時時間,再自動喚醒
available.awaitNanos(delay);
} finally {
// 喚醒后再次獲得鎖后把leader再置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
// 相當(dāng)于喚醒下一個等待的任務(wù)
available.signal();
// 解鎖,本文由公從號“彤哥讀源碼”原創(chuàng)
lock.unlock();
}
}
大致的原理是,利用堆的特性獲取最快到時間的任務(wù),即堆頂?shù)娜蝿?wù):
(1)如果堆頂?shù)娜蝿?wù)到時間了,就讓它從隊列中了隊;
(2)如果堆頂?shù)娜蝿?wù)還沒到時間,就看它還有多久到時間,利用條件鎖等待這段時間,待時間到了后重新走(1)的判斷;
這樣就解決了可以在指定時間后執(zhí)行任務(wù)。
其實,ScheduledThreadPoolExecutor也是可以使用execute()或者submit()提交任務(wù)的,只不過它們會被當(dāng)成0延時的任務(wù)來執(zhí)行一次。
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
實現(xiàn)定時任務(wù)有兩個問題要解決,分別是指定未來某個時刻執(zhí)行任務(wù)、重復(fù)執(zhí)行。
(1)指定某個時刻執(zhí)行任務(wù),是通過延時隊列的特性來解決的;
(2)重復(fù)執(zhí)行,是通過在任務(wù)執(zhí)行后再次把任務(wù)加入到隊列中來解決的。
到這里基本上普通的線程池的源碼解析就結(jié)束了,這種線程池是比較經(jīng)典的實現(xiàn)方式,整體上來說,效率相對不是特別高,因為所有的工作線程共用同一個隊列,每次從隊列中取任務(wù)都要加鎖解鎖操作。
那么,能不能給每個工作線程配備一個任務(wù)隊列呢,在提交任務(wù)的時候就把任務(wù)分配給指定的工作線程,這樣在取任務(wù)的時候就不需要頻繁的加鎖解鎖了。
答案是肯定的,下一章我們一起來看看這種基于“工作竊取”理論的線程池——ForkJoinPool。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
新聞標(biāo)題:死磕java線程系列之線程池深入解析——定時任務(wù)執(zhí)行流程-創(chuàng)新互聯(lián)
轉(zhuǎn)載來源:http://muchs.cn/article14/coegge.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供面包屑導(dǎo)航、品牌網(wǎng)站設(shè)計、域名注冊、做網(wǎng)站、軟件開發(fā)、建站公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容