JDK源碼分析(11)之BlockingQueue相關(guān)

本文將主要結(jié)合源碼對(duì) JDK 中的阻塞隊(duì)列進(jìn)行分析,并比較其各自的特點(diǎn);

在順昌等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站建設(shè)、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作定制網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,成都全網(wǎng)營(yíng)銷推廣,外貿(mào)營(yíng)銷網(wǎng)站建設(shè),順昌網(wǎng)站建設(shè)費(fèi)用合理。

一、BlockingQueue 概述

說到阻塞隊(duì)列想到的第一個(gè)應(yīng)用場(chǎng)景可能就是生產(chǎn)者消費(fèi)者模式了,如圖所示;

JDK源碼分析(11)之 BlockingQueue 相關(guān)

根據(jù)上圖所示,明顯在入隊(duì)和出隊(duì)的時(shí)候,會(huì)發(fā)生競(jìng)爭(zhēng);所以一種很自然的想法就是使用鎖,而在 JDK 中也的確是通過鎖來實(shí)現(xiàn)的;所以?BlockingQueue?的源碼其實(shí)可以當(dāng)成鎖的應(yīng)用示例來查看;同時(shí) JDK 也為我們提供了多種不同功能的隊(duì)列:

  • ArrayBlockingQueue?:基于數(shù)組的有界隊(duì)列;

  • LinkedBlockingQueue?:基于鏈表的×××隊(duì)列(可以設(shè)置容量);

  • PriorityBlockingQueue?:基于二叉堆的×××優(yōu)先級(jí)隊(duì)列;

  • DelayQueue?:基于 PriorityBlockingQueue 的×××延遲隊(duì)列;

  • SynchronousQueue?:無容量的阻塞隊(duì)列(Executors.newCachedThreadPool() 中使用的隊(duì)列);

  • LinkedTransferQueue?:基于鏈表的×××隊(duì)列;

接下來我們就對(duì)最常用的?ArrayBlockingQueue?和?LinkedBlockingQueue?進(jìn)行分析;

二、 ArrayBlockingQueue 源碼分析

1. 結(jié)構(gòu)概述

public?class?ArrayBlockingQueue<E>?extends?AbstractQueue<E>?implements?BlockingQueue<E>,?java.io.Serializable?{????final?Object[]?items;???????????????//?容器數(shù)組
????int?takeIndex;??????????????????????//?出隊(duì)索引
????int?putIndex;???????????????????????//?入隊(duì)索引
????int?count;??????????????????????????//?排隊(duì)個(gè)數(shù)
????final?ReentrantLock?lock;???????????//?全局鎖
????private?final?Condition?notEmpty;???//?出隊(duì)條件隊(duì)列
????private?final?Condition?notFull;????//?入隊(duì)條件隊(duì)列
????...
}

ArrayBlockingQueue?的結(jié)構(gòu)如圖所示:

JDK源碼分析(11)之 BlockingQueue 相關(guān)

如圖所示,

  • ArrayBlockingQueue?的數(shù)組其實(shí)是一個(gè)邏輯上的環(huán)狀結(jié)構(gòu),在添加、取出數(shù)據(jù)的時(shí)候,并沒有像?ArrayList?一樣發(fā)生數(shù)組元素的移動(dòng)(當(dāng)然除了?removeAt(final int removeIndex));

  • 并且由?takeIndex?和?putIndex?指示讀寫位置;

  • 在讀寫的時(shí)候還有兩個(gè)讀寫條件隊(duì)列;

下面我們就讀寫操作,對(duì)源碼簡(jiǎn)單分析:

2. 入隊(duì)

public?void?put(E?e)?throws?InterruptedException?{
??checkNotNull(e);??final?ReentrantLock?lock?=?this.lock;
??lock.lockInterruptibly();??try?{????while?(count?==?items.length)??//?當(dāng)隊(duì)列已滿的時(shí)候放入?putCondition?條件隊(duì)列
??????notFull.await();???
????enqueue(e);??//?入隊(duì)
??}?finally?{
????lock.unlock();
??}
}
private?void?enqueue(E?x)?{??//?assert?lock.getHoldCount()?==?1;
??//?assert?items[putIndex]?==?null;
??final?Object[]?items?=?this.items;
??items[putIndex]?=?x;??//?插入隊(duì)列
??if?(++putIndex?==?items.length)?putIndex?=?0;??//?指針走一圈的時(shí)候復(fù)位
??count++;
??notEmpty.signal();??//?喚醒?takeCondition?條件隊(duì)列中等待的線程}

3. 出隊(duì)

public?E?take()?throws?InterruptedException?{??final?ReentrantLock?lock?=?this.lock;
??lock.lockInterruptibly();??try?{????while?(count?==?0)??//?當(dāng)隊(duì)列為空的時(shí)候,放入?takeCondition?條件
??????notEmpty.await();??
????return?dequeue();???//?出隊(duì)
??}?finally?{
????lock.unlock();
??}
}
private?E?dequeue()?{??//?assert?lock.getHoldCount()?==?1;
??//?assert?items[takeIndex]?!=?null;
??final?Object[]?items?=?this.items;??@SuppressWarnings("unchecked")
??E?x?=?(E)?items[takeIndex];??//?取出元素
??items[takeIndex]?=?null;??if?(++takeIndex?==?items.length)
????takeIndex?=?0;
??count--;??if?(itrs?!=?null)
????itrs.elementDequeued();
??notFull.signal();??//?取出元素后,隊(duì)列空出一位,所以喚醒?putCondition?中的線程
??return?x;
}

三、LinkedBlockingQueue 源碼分析

1. 結(jié)構(gòu)概述

public?class?LinkedBlockingQueue<E>?extends?AbstractQueue<E>?implements?BlockingQueue<E>,?java.io.Serializable?{??
??private?final?int?capacity;?//?默認(rèn)?Integer.MAX_VALUE
??private?final?AtomicInteger?count?=?new?AtomicInteger();?//?容量
??transient?Node<E>?head;??????????//?頭結(jié)點(diǎn)?head.item?==?null
??private?transient?Node<E>?last;??//?尾節(jié)點(diǎn)?last.next?==?null
??private?final?ReentrantLock?takeLock?=?new?ReentrantLock();??//?出隊(duì)鎖
??private?final?Condition?notEmpty?=?takeLock.newCondition();??//?出隊(duì)條件
??private?final?ReentrantLock?putLock?=?new?ReentrantLock();???//?入隊(duì)鎖
??private?final?Condition?notFull?=?putLock.newCondition();????//?入隊(duì)條件
??
??static?class?Node<E>?{
????E?item;
????Node<E>?next;
????Node(E?x)?{?item?=?x;?}
??}
}

LinkedBlockingQueue?的結(jié)構(gòu)如圖所示:

JDK源碼分析(11)之 BlockingQueue 相關(guān)

如圖所示,

  • LinkedBlockingQueue?其實(shí)就是一個(gè)簡(jiǎn)單的單向鏈表,其中頭部元素的數(shù)據(jù)為空,尾部元素的 next 為空;

  • 因?yàn)樽x寫都有競(jìng)爭(zhēng),所以在頭部和尾部分別有一把鎖;同時(shí)還有對(duì)應(yīng)的兩個(gè)條件隊(duì)列;

下面我們就讀寫操作,對(duì)源碼簡(jiǎn)單分析:

2. 入隊(duì)

public?boolean?offer(E?e)?{??if?(e?==?null)?throw?new?NullPointerException();??final?AtomicInteger?count?=?this.count;??if?(count.get()?==?capacity)?return?false;??//?如果隊(duì)列已滿,直接返回失敗
??int?c?=?-1;
??Node<E>?node?=?new?Node<E>(e);??????????????//?將數(shù)據(jù)封裝為節(jié)點(diǎn)
??final?ReentrantLock?putLock?=?this.putLock;
??putLock.lock();??try?{????if?(count.get()?<?capacity)?{
??????enqueue(node);??????????????????????????//?入隊(duì)
??????c?=?count.getAndIncrement();??????if?(c?+?1?<?capacity)???????????????????//?如果隊(duì)列未滿,則繼續(xù)喚醒?putCondition?條件隊(duì)列
????????notFull.signal();
????}
??}?finally?{
????putLock.unlock();
??}??if?(c?==?0)???????????//?如果添加之前的容量為0,說明在出隊(duì)的時(shí)候有競(jìng)爭(zhēng),則喚醒?takeCondition
????signalNotEmpty();???//?因?yàn)槭莾砂焰i,所以在喚醒?takeCondition的時(shí)候,還需要獲取?takeLock
??return?c?>=?0;
}
private?void?enqueue(Node<E>?node)?{??//?assert?putLock.isHeldByCurrentThread();
??//?assert?last.next?==?null;
??last?=?last.next?=?node;??//?連接節(jié)點(diǎn),并設(shè)置尾節(jié)點(diǎn)}

3. 出隊(duì)

public?E?take()?throws?InterruptedException?{
??E?x;??int?c?=?-1;??final?AtomicInteger?count?=?this.count;??final?ReentrantLock?takeLock?=?this.takeLock;
??takeLock.lockInterruptibly();??try?{????while?(count.get()?==?0)?{???//?如果隊(duì)列為空,則加入?takeCondition?條件隊(duì)列
??????notEmpty.await();
????}
????x?=?dequeue();???????????????//?出隊(duì)
????c?=?count.getAndDecrement();????if?(c?>?1)
??????notEmpty.signal();?????????//?如果隊(duì)列還有剩余,則繼續(xù)喚醒?takeCondition?條件隊(duì)列
??}?finally?{
????takeLock.unlock();
??}??if?(c?==?capacity)?????????????//?如果取之前隊(duì)列是滿的,說明入隊(duì)的時(shí)候有競(jìng)爭(zhēng),則喚醒?putCondition
????signalNotFull();?????????????//?同樣注意是兩把鎖
??return?x;
}
private?E?dequeue()?{??//?assert?takeLock.isHeldByCurrentThread();
??//?assert?head.item?==?null;
??Node<E>?h?=?head;
??Node<E>?first?=?h.next;
??h.next?=?h;?//?help?GC???//?將next引用指向自己,則該節(jié)點(diǎn)不可達(dá),在下一次GC的時(shí)候回收
??head?=?first;
??E?x?=?first.item;
??first.item?=?null;??return?x;
}

四、ABQ、LBQ 對(duì)比

根據(jù)以上的講解,我們可以逐步分析出一些不同,以及在不同場(chǎng)景隊(duì)列的選擇:

  1. 結(jié)構(gòu)不同

  • ABQ:基于數(shù)組,有界,一把鎖;

  • LBQ:基于鏈表,×××,兩把鎖;

內(nèi)存分配

  • ABQ:隊(duì)列空間預(yù)先初始化,受堆空間影響小,穩(wěn)定性高;

  • LBQ:隊(duì)列空間動(dòng)態(tài)變化,受對(duì)空間影響大,穩(wěn)定性差;

入隊(duì)、出隊(duì)效率

  • ABQ:數(shù)據(jù)直接賦值,移除;隊(duì)列空間重復(fù)使用,效率高;

  • LBQ:數(shù)據(jù)需要包裝為節(jié)點(diǎn);需開辟新空間,效率低;

競(jìng)爭(zhēng)方面

  • ABQ:出入隊(duì)共用一把鎖,相互影響;競(jìng)爭(zhēng)嚴(yán)重時(shí)效率低;

  • LBQ:出入隊(duì)分用兩把鎖,互不影響;競(jìng)爭(zhēng)嚴(yán)重時(shí)效率影響??;

所以在這里并不能簡(jiǎn)單的給出詳細(xì)的數(shù)據(jù),證明哪個(gè)隊(duì)列更適合什么場(chǎng)景,最好是結(jié)合實(shí)際使用場(chǎng)景分析。

網(wǎng)站名稱:JDK源碼分析(11)之BlockingQueue相關(guān)
網(wǎng)頁鏈接:http://www.muchs.cn/article6/pdhgig.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名、Google、虛擬主機(jī)網(wǎng)站內(nèi)鏈、小程序開發(fā)、軟件開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站優(yōu)化排名