常用BlockingQueue有哪些,相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。
成都創(chuàng)新互聯(lián)專(zhuān)注于萊州企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站設(shè)計(jì),商城建設(shè)。萊州網(wǎng)站建設(shè)公司,為萊州等地區(qū)提供建站服務(wù)。全流程按需定制制作,專(zhuān)業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,成都創(chuàng)新互聯(lián)專(zhuān)業(yè)和態(tài)度為您提供的服務(wù)
之前,介紹了一下 ThreadPoolExecutor 的各參數(shù)的含義(并發(fā)編程之線(xiàn)程池ThreadPoolExecutor),其中有一個(gè) BlockingQueue,它是一個(gè)阻塞隊(duì)列。那么,小伙伴們有沒(méi)有想過(guò),為什么此處的線(xiàn)程池要用阻塞隊(duì)列呢?
我們知道隊(duì)列是先進(jìn)先出的。當(dāng)放入一個(gè)元素的時(shí)候,會(huì)放在隊(duì)列的末尾,取出元素的時(shí)候,會(huì)從隊(duì)頭取。那么,當(dāng)隊(duì)列為空或者隊(duì)列滿(mǎn)的時(shí)候怎么辦呢。
這時(shí),阻塞隊(duì)列,會(huì)自動(dòng)幫我們處理這種情況。
當(dāng)阻塞隊(duì)列為空的時(shí)候,從隊(duì)列中取元素的操作就會(huì)被阻塞。當(dāng)阻塞隊(duì)列滿(mǎn)的時(shí)候,往隊(duì)列中放入元素的操作就會(huì)被阻塞。
而后,一旦空隊(duì)列有數(shù)據(jù)了,或者滿(mǎn)隊(duì)列有空余位置時(shí),被阻塞的線(xiàn)程就會(huì)被自動(dòng)喚醒。
這就是阻塞隊(duì)列的好處,你不需要關(guān)心線(xiàn)程何時(shí)被阻塞,也不需要關(guān)心線(xiàn)程何時(shí)被喚醒,一切都由阻塞隊(duì)列自動(dòng)幫我們完成。我們只需要關(guān)注具體的業(yè)務(wù)邏輯就可以了。
而這種阻塞隊(duì)列經(jīng)常用在生產(chǎn)者消費(fèi)者模式中。(可參看:面試官讓我手寫(xiě)一個(gè)生產(chǎn)者消費(fèi)者模式)
那么,一般我們用到的阻塞隊(duì)列有哪些呢。下面,通過(guò)idea的類(lèi)圖,列出來(lái)常用的阻塞隊(duì)列,然后一個(gè)一個(gè)講解(不懂怎么用的,可以參考這篇文章:怎么用IDEA快速查看類(lèi)圖關(guān)系)。
阻塞隊(duì)列中,所有常用的方法都在 BlockingQueue 接口中定義。如
插入元素的方法:put,offer,add。移除元素的方法:remove,poll,take。
它們有四種不同的處理方式,第一種是在失敗時(shí)拋出異常,第二種是在失敗時(shí)返回特殊值,第三種是一直阻塞當(dāng)前線(xiàn)程,最后一種是在指定時(shí)間內(nèi)阻塞,否則返回特殊值。(以上特殊值,是指在插入元素時(shí),失敗返回false,在取出元素時(shí),失敗返回null)
拋異常 | 特殊值 | 阻塞 | 超時(shí) | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
1)ArrayBlockingQueue
這是一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。首先看下它的構(gòu)造方法,有三個(gè)。
第一個(gè)可以指定隊(duì)列的大小,第二個(gè)還可以指定隊(duì)列是否公平,不指定的話(huà),默認(rèn)是非公平。它是使用 ReentrantLock 的公平鎖和非公平鎖實(shí)現(xiàn)的(后續(xù)講解AQS時(shí),會(huì)詳細(xì)說(shuō)明)。
簡(jiǎn)單理解就是,ReentrantLock 內(nèi)部會(huì)維護(hù)一個(gè)有先后順序的等待隊(duì)列,假如有五個(gè)任務(wù)一起過(guò)來(lái),都被阻塞了。如果是公平的,則等待隊(duì)列中等待最久的任務(wù)就會(huì)先進(jìn)入阻塞隊(duì)列。如果是非公平的,那么這五個(gè)線(xiàn)程就需要搶鎖,誰(shuí)先搶到,誰(shuí)就先進(jìn)入阻塞隊(duì)列。
第三個(gè)構(gòu)造方法,是把一個(gè)集合的元素初始化到阻塞隊(duì)列中。
另外,ArrayBlockingQueue 沒(méi)有實(shí)現(xiàn)讀寫(xiě)分離,也就是說(shuō),讀和寫(xiě)是不能同時(shí)進(jìn)行的。因?yàn)?,它讀寫(xiě)時(shí)用的是同一把鎖,如下圖所示:
2) LinkedBlockingQueue
這是一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。它的構(gòu)造方法有三個(gè)。
可以看到和 ArrayBlockingQueue 的構(gòu)造方法大同小異,不過(guò)是,LinkedBlockingQueue 可以不指定隊(duì)列的大小,默認(rèn)值是 Integer.MAX_VALUE 。
但是,最好不要這樣做,建議指定一個(gè)固定大小。因?yàn)椋绻a(chǎn)者的速度比消費(fèi)者的速度大的多的情況下,這會(huì)導(dǎo)致阻塞隊(duì)列一直膨脹,直到系統(tǒng)內(nèi)存被耗盡(此時(shí),還沒(méi)達(dá)到隊(duì)列容量的最大值)。
此外,LinkedBlockingQueue 實(shí)現(xiàn)了讀寫(xiě)分離,可以實(shí)現(xiàn)數(shù)據(jù)的讀和寫(xiě)互不影響,這在高并發(fā)的場(chǎng)景下,對(duì)于效率的提高無(wú)疑是非常巨大的。
3) SynchronousQueue
這是一個(gè)沒(méi)有緩沖的無(wú)界隊(duì)列。什么意思,看一下它的 size 方法:
總是返回 0 ,因?yàn)樗且粋€(gè)沒(méi)有容量的隊(duì)列。
當(dāng)執(zhí)行插入元素的操作時(shí),必須等待一個(gè)取出操作。也就是說(shuō),put元素的時(shí)候,必須等待 take 操作。
那么,有的同學(xué)就好奇了,這沒(méi)有容量,還叫什么隊(duì)列啊,這有什么意義呢。
我的理解是,這適用于并發(fā)任務(wù)不大,而且生產(chǎn)者和消費(fèi)者的速度相差不多的場(chǎng)景下,直接把生產(chǎn)者和消費(fèi)者對(duì)接,不用經(jīng)過(guò)隊(duì)列的入隊(duì)出隊(duì)這一系列操作。所以,效率上會(huì)高一些。
可以去查看一下 Excutors.newCachedThreadPool 方法用的就是這種隊(duì)列。
這個(gè)隊(duì)列有兩個(gè)構(gòu)造方法,用于傳入是公平還是非公平,默認(rèn)是非公平。
4)PriorityBlockingQueue
這是一個(gè)支持優(yōu)先級(jí)排序的無(wú)界隊(duì)列。有四個(gè)構(gòu)造方法:
可以指定初始容量大小(注意初始容量并不代表最大容量),或者不指定,默認(rèn)大小為 11。也可以傳入一個(gè)比較器,把元素按一定的規(guī)則排序,不指定比較器的話(huà),默認(rèn)是自然順序。
PriorityBlockingQueue 是基于二叉樹(shù)最小堆實(shí)現(xiàn)的,每當(dāng)取元素的時(shí)候,就會(huì)把優(yōu)先級(jí)最高的元素取出來(lái)。我們測(cè)試一下:
public class Person {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public Person() {
}
}
public class QueueTest {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Person> priorityBlockingQueue = new PriorityBlockingQueue<>(1, new Comparator<Person>() {
@Override
public int compare(Person o1, Person o2) {
return o1.getId() - o2.getId();
}
});
Person p2 = new Person(7, "李四");
Person p1 = new Person(9, "張三");
Person p3 = new Person(6, "王五");
Person p4 = new Person(2, "趙六");
priorityBlockingQueue.add(p1);
priorityBlockingQueue.add(p2);
priorityBlockingQueue.add(p3);
priorityBlockingQueue.add(p4);
//由于二叉樹(shù)最小堆實(shí)現(xiàn),用這種方式直接打印元素,不能保證有序
System.out.println(priorityBlockingQueue);
System.out.println(priorityBlockingQueue.take());
System.out.println(priorityBlockingQueue);
System.out.println(priorityBlockingQueue.take());
System.out.println(priorityBlockingQueue);
}
}
打印結(jié)果:
[Person{id=2, name='趙六'}, Person{id=6, name='王五'}, Person{id=7, name='李四'}, Person{id=9, name='張三'}]
Person{id=2, name='趙六'}
[Person{id=6, name='王五'}, Person{id=9, name='張三'}, Person{id=7, name='李四'}]
Person{id=6, name='王五'}
[Person{id=7, name='李四'}, Person{id=9, name='張三'}]
可以看到,第一次取出的是 id 最小值 2, 第二次取出的是 6 。
5)DelayQueue
這是一個(gè)帶有延遲時(shí)間的無(wú)界阻塞隊(duì)列。隊(duì)列中的元素,只有等延時(shí)時(shí)間到了,才能取出來(lái)。此隊(duì)列一般用于過(guò)期數(shù)據(jù)的刪除,或任務(wù)調(diào)度。以下,模擬一下定長(zhǎng)時(shí)間的數(shù)據(jù)刪除。
首先定義數(shù)據(jù)元素,需要實(shí)現(xiàn) Delayed 接口,實(shí)現(xiàn) getDelay 方法用于計(jì)算剩余時(shí)間,和 CompareTo方法用于優(yōu)先級(jí)排序。
public class DelayData implements Delayed {
private int id;
private String name;
//數(shù)據(jù)到期時(shí)間
private long endTime;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
public DelayData(int id, String name, long endTime) {
this.id = id;
this.name = name;
//需要把傳入的時(shí)間endTime 加上當(dāng)前系統(tǒng)時(shí)間,作為數(shù)據(jù)的到期時(shí)間
this.endTime = endTime + System.currentTimeMillis();
}
public DelayData() {
}
@Override
public long getDelay(TimeUnit unit) {
return this.endTime - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
return o.getDelay(this.timeUnit) - this.getDelay(this.timeUnit) < 0 ? 1: -1;
}
}
模擬三條數(shù)據(jù),分別設(shè)置不同的過(guò)期時(shí)間:
public class ProcessData {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayData> delayQueue = new DelayQueue<>();
DelayData a = new DelayData(5, "A", 5000);
DelayData b = new DelayData(8, "B", 8000);
DelayData c = new DelayData(2, "C", 2000);
delayQueue.add(a);
delayQueue.add(b);
delayQueue.add(c);
System.out.println("開(kāi)始計(jì)時(shí)時(shí)間:" + System.currentTimeMillis());
for (int i = 0; i < 3; i++) {
DelayData data = delayQueue.take();
System.out.println("id:"+data.getId()+",數(shù)據(jù):"+data.getName()+"被移除,當(dāng)前時(shí)間:"+System.currentTimeMillis());
}
}
}
最后結(jié)果:
開(kāi)始計(jì)時(shí)時(shí)間:1583333583216
id:2,數(shù)據(jù):C被移除,當(dāng)前時(shí)間:1583333585216
id:5,數(shù)據(jù):A被移除,當(dāng)前時(shí)間:1583333588216
id:8,數(shù)據(jù):B被移除,當(dāng)前時(shí)間:1583333591216
可以看到,數(shù)據(jù)是按過(guò)期時(shí)間長(zhǎng)短,按順序移除的。C的時(shí)間最短 2 秒,然后過(guò)了 3 秒 A 也過(guò)期,再過(guò) 3 秒,B 過(guò)期。
看完上述內(nèi)容,你們掌握常用BlockingQueue有哪些的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!
當(dāng)前名稱(chēng):常用BlockingQueue有哪些
瀏覽地址:http://muchs.cn/article46/jcpchg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)頁(yè)設(shè)計(jì)公司、網(wǎng)站維護(hù)、品牌網(wǎng)站制作、用戶(hù)體驗(yàn)、做網(wǎng)站、面包屑導(dǎo)航
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)