CountDownLatch的簡(jiǎn)單應(yīng)用和實(shí)現(xiàn)原理

CountDownLatch的簡(jiǎn)單實(shí)現(xiàn)

創(chuàng)新互聯(lián)從2013年創(chuàng)立,我們提供高端網(wǎng)站建設(shè)、重慶小程序開發(fā)公司、電商視覺設(shè)計(jì)、重慶APP開發(fā)公司及網(wǎng)絡(luò)營(yíng)銷搜索優(yōu)化服務(wù),在傳統(tǒng)互聯(lián)網(wǎng)與移動(dòng)互聯(lián)網(wǎng)發(fā)展的背景下,我們堅(jiān)守著用標(biāo)準(zhǔn)的設(shè)計(jì)方案與技術(shù)開發(fā)實(shí)力作基礎(chǔ),以企業(yè)及品牌的互聯(lián)網(wǎng)商業(yè)目標(biāo)為核心,為客戶打造具商業(yè)價(jià)值與用戶體驗(yàn)的互聯(lián)網(wǎng)+產(chǎn)品。

業(yè)務(wù)背景假設(shè):現(xiàn)在一個(gè)前端頁面的展示需要調(diào)用3個(gè)外部電商平臺(tái)接口的數(shù)據(jù),所以在我們提供給前端的這個(gè)接口上,我們要調(diào)用3個(gè)外部電商接口,最后需要對(duì)所有的數(shù)據(jù)做一個(gè)整合,方便前端展示。

一般情況我們都是順序調(diào)用3個(gè)電商接口,得到數(shù)據(jù)后調(diào)用整合方法,假設(shè)每個(gè)電商接口調(diào)用時(shí)間為2秒,如下:

public static void main(String[] args) throws Exception{

long startTime = new Date().getTime();

//調(diào)用第一個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread.sleep(2000);

System.out.println("獲取電商平臺(tái)1的數(shù)據(jù)");

//調(diào)用第二個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread.sleep(2000);

System.out.println("獲取電商平臺(tái)2的數(shù)據(jù)");

//調(diào)用第三個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread.sleep(2000);

System.out.println("獲取電商平臺(tái)3的數(shù)據(jù)");

System.out.println("對(duì)三個(gè)電商平臺(tái)的數(shù)據(jù)進(jìn)行合并");

long endTime = new Date().getTime();

long time = endTime - startTime;

System.out.println("總耗時(shí)" + time);

}

調(diào)用后耗時(shí)6s,如下:

CountDownLatch的簡(jiǎn)單應(yīng)用和實(shí)現(xiàn)原理

以上方法耗時(shí)太長(zhǎng)了,需要優(yōu)化,優(yōu)化思路:因?yàn)?個(gè)接口沒有先后關(guān)系,所以完全可以并行執(zhí)行,之后再做數(shù)據(jù)的整合,這樣設(shè)計(jì)接口耗時(shí)肯定會(huì)節(jié)省很多

使用CountDownLatch來實(shí)現(xiàn)以上優(yōu)化思路

CountDownLatch是什么:CountDownLatch是java.util.concurrent包下的類,它在多線程并發(fā)編程里充當(dāng)這計(jì)數(shù)器的功能,通過構(gòu)造函數(shù)維護(hù)一個(gè)int類型的初始值,如果一個(gè)線程調(diào)用await()方法,那么該線程就會(huì)進(jìn)入阻塞狀態(tài),直到初始值變?yōu)?后,調(diào)用await()方法的阻塞線程將會(huì)被喚醒,執(zhí)行后續(xù)操作,而通過countDown()這個(gè)方法,我們就能夠?qū)崿F(xiàn)初始值的減法,每調(diào)用一次,初始值減一。

具體實(shí)現(xiàn)代碼如下:

public static void main(String[] args) throws Exception {

CountDownLatch countDownLatch = new CountDownLatch(3);

long startTime = new Date().getTime();

//調(diào)用第一個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread thread1 = new Thread(() -> {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)一的數(shù)據(jù)");

countDownLatch.countDown();

});

//調(diào)用第二個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread thread2 = new Thread(() -> {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)二的數(shù)據(jù)");

countDownLatch.countDown();

});

//調(diào)用第二個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread thread3 = new Thread(() -> {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)三的數(shù)據(jù)");

countDownLatch.countDown();

});

thread1.start();

thread2.start();

thread3.start();

countDownLatch.await();

System.out.println("對(duì)三個(gè)電商平臺(tái)的數(shù)據(jù)進(jìn)行合并");

long endTime = new Date().getTime();

long time = endTime - startTime;

System.out.println("總耗時(shí)" + time);

}

因?yàn)殡娚探涌谟?個(gè),所以CountDownLatch的初始值設(shè)為3,之后多線程執(zhí)行3個(gè)電商接口,每執(zhí)行完一個(gè),調(diào)用countDown()方法把初始值減一,同時(shí)主線程調(diào)用await()進(jìn)入阻塞狀態(tài),直到初始值減為0,就被重新喚醒,開始執(zhí)行數(shù)據(jù)的合并邏輯。

執(zhí)行效果如下:

CountDownLatch的簡(jiǎn)單應(yīng)用和實(shí)現(xiàn)原理

可以看到總耗時(shí)節(jié)省了約三分之二

CountDownLatch的實(shí)現(xiàn)原理

先看CountDownLatch的構(gòu)造方法:

public CountDownLatch(int count) {

if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);

}

可以看到它除了做個(gè)初始值的異常判斷外,實(shí)際上是構(gòu)造了一個(gè)Sync的對(duì)象,賦值給自己的屬性sync,那么看下Sync對(duì)象的源碼:

private static final class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 4982264981922014374L;

//Sync對(duì)象的構(gòu)造方法

Sync(int count) {

setState(count);

}

}

從以上源碼可以看出,Sync對(duì)象繼承了AQS,所以調(diào)用CountDownLatch的構(gòu)造方法實(shí)際上就是調(diào)用Sync對(duì)象的構(gòu)造方法,然后通過setState(count)方法設(shè)置AQS的state值。

public abstract class AbstractQueuedSynchronizer

extends AbstractOwnableSynchronizer

implements java.io.Serializable {

private volatile int state;

protected final void setState(int newState) {

state = newState;

}

}

再看countDown()方法的實(shí)現(xiàn):

public void countDown() {

sync.releaseShared(1);

}

實(shí)際上是調(diào)用了Sync對(duì)象的releaseShared()方法,參數(shù)固定為1

public abstract class AbstractQueuedSynchronizer

extends AbstractOwnableSynchronizer

implements java.io.Serializable {

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

//嘗試釋放共享模式的鎖

protected boolean tryReleaseShared(int arg) {

throw new UnsupportedOperationException();

}

private void doReleaseShared() {

for (;;) {

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);

}

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

if (h == head) // loop if head changed

break;

}

}

}

其中方法tryReleaseShared()的具體實(shí)現(xiàn)是在CountDownLatch類,如下:

public class CountDownLatch {

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextc == 0;

}

}

}

//獲取計(jì)數(shù)器的值

protected final int getState() {

return state;

}

}

通過循環(huán)和compareAndSetState()方法我們可以看出這是一個(gè)自旋的CAS(Compare And Set)操作,先獲取state的值,為0則返回false,否則執(zhí)行減1操作,失敗就重試,直到減為0,則返回true,之后執(zhí)行doReleaseShared()方法

await()方法的實(shí)現(xiàn)

public class CountDownLatch {

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

}

可以看到await()實(shí)際是調(diào)用Sync對(duì)象的acquireSharedInterruptibly()方法:

public abstract class AbstractQueuedSynchronizer

extends AbstractOwnableSynchronizer

implements java.io.Serializable {

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

}

其中tryAcquireShared()方法的具體實(shí)現(xiàn)是在CountDownLatch類:

public class CountDownLatch {

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

}

通過該方法可以判斷出如果計(jì)數(shù)器值為0則返回1,否則返回-1,然后為0則會(huì)執(zhí)行之后的方法,如果繼續(xù)跟下去,最后會(huì)發(fā)現(xiàn)還是調(diào)用到了AQS的doReleaseShared()方法,所有阻塞的線程會(huì)被放開。

CountDownLatch和.join()的使用區(qū)別

CountDownLatch和.join()方法的作用其實(shí)很像,join()方法的使用可參考Java多線程中join()方法的使用,不過CountDownLatch使用起來會(huì)比join()方法更有靈活性。假設(shè)電商接口調(diào)用其實(shí)有兩個(gè)步驟,在每個(gè)接口的第一步獲取完數(shù)據(jù)后,還要做個(gè)數(shù)據(jù)記錄,耗時(shí)也是2s,下面給出示例代碼:

使用join()方法:鄭州人流多少錢 http://mobile.sgyy029.com/

public static void main(String[] args) throws Exception {

CountDownLatch countDownLatch = new CountDownLatch(3);

long startTime = new Date().getTime();

//調(diào)用第一個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread thread1 = new Thread(() -> {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)一的數(shù)據(jù)");

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)一的數(shù)據(jù)后做個(gè)記錄");

});

//調(diào)用第二個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread thread2 = new Thread(() -> {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)二的數(shù)據(jù)");

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)二的數(shù)據(jù)后做個(gè)記錄");

});

//調(diào)用第三個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread thread3 = new Thread(() -> {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)三的數(shù)據(jù)");

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)三的數(shù)據(jù)后做個(gè)記錄");

});

thread1.start();

thread2.start();

thread3.start();

thread1.join();

thread2.join();

thread3.join();

System.out.println("對(duì)三個(gè)電商平臺(tái)的數(shù)據(jù)進(jìn)行合并");

long endTime = new Date().getTime();

long time = endTime - startTime;

System.out.println("總耗時(shí)" + time);

}

耗時(shí)效果如下:

可以發(fā)現(xiàn),使用join()方法,必須得等到每個(gè)線程都結(jié)束后才會(huì)接著執(zhí)行之后的主線程,這樣總耗時(shí)就會(huì)被數(shù)據(jù)記錄的方法拖慢,達(dá)到4311ms

使用CountDownLatch,在獲取數(shù)據(jù)后就對(duì)初始值減1,而不是等到記錄方法完成才減1,如下:

public static void main(String[] args) throws Exception {

CountDownLatch countDownLatch = new CountDownLatch(3);

long startTime = new Date().getTime();

//調(diào)用第一個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread thread1 = new Thread(() -> {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)一的數(shù)據(jù)");

countDownLatch.countDown();

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)一的數(shù)據(jù)后做個(gè)記錄");

});

//調(diào)用第二個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread thread2 = new Thread(() -> {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)二的數(shù)據(jù)");

countDownLatch.countDown();

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)二的數(shù)據(jù)后做個(gè)記錄");

});

//調(diào)用第二個(gè)電商平臺(tái)的接口取得訂單數(shù),用時(shí)2s

Thread thread3 = new Thread(() -> {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)三的數(shù)據(jù)");

countDownLatch.countDown();

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

}

System.out.println("獲取電商平臺(tái)三的數(shù)據(jù)后做個(gè)記錄");

});

thread1.start();

thread2.start();

thread3.start();

countDownLatch.await();

System.out.println("對(duì)三個(gè)電商平臺(tái)的數(shù)據(jù)進(jìn)行合并");

long endTime = new Date().getTime();

long time = endTime - startTime;

System.out.println("總耗時(shí)" + time);

}

耗時(shí)效果如下:

可以發(fā)現(xiàn)耗時(shí)才2185ms

以上就是CountDownLatch和join()方法的使用區(qū)別,相比起join()方法要等線程都執(zhí)行完才會(huì)執(zhí)行阻塞的線程,CountDownLatch就能夠靈活控制阻塞線程的執(zhí)行時(shí)機(jī),耗時(shí)可以更少。

本文標(biāo)題:CountDownLatch的簡(jiǎn)單應(yīng)用和實(shí)現(xiàn)原理
當(dāng)前URL:http://muchs.cn/article38/ipiosp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供小程序開發(fā)定制網(wǎng)站、標(biāo)簽優(yōu)化、Google、全網(wǎng)營(yíng)銷推廣、用戶體驗(yàn)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

小程序開發(fā)