[TOC]
創(chuàng)新互聯(lián)公司于2013年開始,先為欽北等服務(wù)建站,欽北等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為欽北企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。
CountDownLatch 允許一個(gè)或者多個(gè)線程等待其他線程完成操作。
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程達(dá)到一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行。
public class CountDownLatchDemo {
public static final CountDownLatch count = new CountDownLatch(10);
private static int j = 0;
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread(
()-> {
System.out.println("我是"+(++j));
count.countDown();
}
).start();
}
count.await();
System.out.println("我是總數(shù)"+j+"?。?!");
}
}
運(yùn)行結(jié)果:
我是1
我是2
我是3
我是4
我是5
我是6
我是7
我是8
我是9
我是10
我是總數(shù)10?。?!
public class CyclicBarrierDemo {
private static final CyclicBarrier c = new CyclicBarrier(6,new Thread(() ->
System.out.println("我是最后一個(gè)")
));
private static AtomicInteger index = new AtomicInteger(1);
public static void main(String[] args) throws Exception, BrokenBarrierException {
for (int i = 1; i <= 6; i ++) {
new Thread(() -> {
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("我是:"+(index.getAndIncrement()));
}) .start();
}
}
}
運(yùn)行結(jié)果:
我是最后一個(gè)
我是:1
我是:2
我是:3
我是:4
我是:5
我是:6
原理:
CountDownLatch 又叫做閉鎖,CountDownLatch 的構(gòu)造函數(shù)接受一個(gè)int類型的參數(shù)作為計(jì)數(shù)器,如果你想等待n個(gè)節(jié)點(diǎn)完成,那就傳入N;當(dāng)我們調(diào)用CountDownLatch 的countDown方法時(shí),N就會(huì)減1,CountDownLatch的await會(huì)阻塞當(dāng)前方法,直到N變成0;由于countDown方法可以用在任何地方,這里說的N個(gè)點(diǎn),可以是N個(gè)線程,也可以是1一個(gè)線程里面的N個(gè)步驟。
源碼:
// 構(gòu)造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 內(nèi)部類 Sync 繼承AQS
private static final class Sync extends AbstractQueuedSynchronizer {
}
public void countDown() {
// 調(diào)用了AQS的releaseShared方法
sync.releaseShared(1);
}
// 這是Sync的tryReleaseShared
// AQS的releaseShared會(huì)調(diào)用子類的tryReleaseShared 用來控制count
// tryReleaseShared 共享式的釋放狀態(tài) 具體參考AQS
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 獲取的其實(shí)就是我們構(gòu)造函數(shù)的count
int c = getState();
// count == 0 證明整個(gè)記錄流程已經(jīng)完畢了
if (c == 0)
return false;
// 減1
int nextc = c-1;
if (compareAndSetState(c, nextc)) cas 更新
return nextc == 0; // 等于0,返回ture 證明計(jì)數(shù)結(jié)束了,可以去喚醒同步隊(duì)列的線程了
// 喚醒是AQS的releaseShared方法
// 結(jié)合CountDownLatch的await方法理解整這里
}
}
public void await() throws InterruptedException {
// 共享式獲取同步轉(zhuǎn)態(tài)
sync.acquireSharedInterruptibly(1);
}
// 這是Sync的方法
// await 其實(shí)是調(diào)用的AQS的acquireSharedInterruptibly 但是aqs會(huì)調(diào)用子類tryAcquireShared
// 我們看到值有state等于0 才會(huì)返回true 成功 -1 表示失敗 失敗就要加入同步隊(duì)列
// 所以在countDown方法里面等于0 為什么要去喚醒 ,應(yīng)為這里會(huì)進(jìn)入同步隊(duì)列
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
通過源碼我們可以發(fā)現(xiàn)只有當(dāng)countDown 這個(gè)方法計(jì)數(shù)遞減完畢,別的線程才能執(zhí)行,因?yàn)檎{(diào)用await的線程會(huì)進(jìn)入AQS的同步隊(duì)列,然后阻塞。
原理:
CyclicBarrier 默認(rèn)構(gòu)造方法是CyclicBarrier (int parties),器參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await告訴CyclicBarrier 我已經(jīng)到達(dá)屏障了,然后當(dāng)前線程被阻塞;CyclicBarrier 海提供了一個(gè)高級(jí)的構(gòu)造函數(shù),CyclicBarrier (int parties,Runnable barrierAction),用于在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行barrierAction線程,方便處理更復(fù)雜的業(yè)務(wù)邏輯。
源碼:
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 獲取鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 這一代的狀態(tài)
final Generation g = generation;
// 默認(rèn)為false Barrier被Broken 就會(huì)為true
if (g.broken)
throw new BrokenBarrierException();
// 線程被中斷了,標(biāo)記為breakBarrier,喚醒所有線程
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 計(jì)數(shù)器減減
int index = --count;
// 到達(dá) trip
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 執(zhí)行構(gòu)造函數(shù)里面的線程
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 喚醒所有等待線程 然后重置
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 一直自旋直到發(fā)生:tripped, broken, interrupted, timed out
for (;;) {
try {
// 帶時(shí)間
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
// 自旋過程中發(fā)生中斷
} catch (InterruptedException ie) {
// 等于說明當(dāng)前被重點(diǎn)的這個(gè)線程沒有被broken
// 拋異常
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else { // 不等于說明后來的線程已經(jīng)broken了
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
// 中斷線程 breakBarrier已經(jīng)沒有意義了
Thread.currentThread().interrupt();
}
}
if (g.broken) // 屏蔽Broken
throw new BrokenBarrierException();
// 別的線程更新了generation 不屬于當(dāng)前代
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
我們發(fā)現(xiàn)CyclicBarrier是所有線程一起阻塞,直到到達(dá)屏障點(diǎn),然后全部喚醒一起執(zhí)行。
CountDownLatch和CyclicBarrier都可以實(shí)現(xiàn)一個(gè)線程等待一個(gè)或者多個(gè)線程到達(dá)一個(gè)點(diǎn)之后才執(zhí)行,但是這一個(gè)或者多個(gè)線程的狀態(tài)卻是不一樣的,CountDownLatch是來一個(gè)執(zhí)行一個(gè)不會(huì)阻塞,直到大家執(zhí)行完了,在執(zhí)行調(diào)用await方法的線程,CyclicBarrier是來一個(gè)阻塞一個(gè),直到大家都阻塞完畢,然后在優(yōu)先執(zhí)行構(gòu)造函數(shù)里面的線程,在喚醒所有阻塞的線程;CountDownLatch的計(jì)數(shù)器只能執(zhí)行一次,CyclicBarrier可以執(zhí)行多次,所以CyclicBarrier可以執(zhí)行復(fù)雜的業(yè)務(wù)場(chǎng)景。
參考 《Java 并發(fā)編程的藝術(shù)》
當(dāng)前文章:Java并發(fā)工具CountDownLatch和CyclicBarrier原理解析
網(wǎng)站網(wǎng)址:http://muchs.cn/article18/jooogp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制網(wǎng)站、微信公眾號(hào)、小程序開發(fā)、網(wǎng)站設(shè)計(jì)公司、服務(wù)器托管、App開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)