LinkedBlockingQueue1.8源碼詳解

[TOC]

創(chuàng)新互聯(lián)公司是一家專業(yè)提供尚志企業(yè)網(wǎng)站建設(shè),專注與成都網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)、H5場(chǎng)景定制、小程序制作等業(yè)務(wù)。10年已為尚志眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)的建站公司優(yōu)惠進(jìn)行中。

LinkedBlockingQueue 1.8 源碼詳解

一,簡介

LinkedBlockingQueue 是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列;此隊(duì)列的默認(rèn)和最大長度為Integer.MAX_VALUE;此隊(duì)列按照先進(jìn)先出的原則對(duì)元素就行排序;隊(duì)列有兩個(gè)鎖,生成和消費(fèi)各一把鎖,都是默認(rèn)的非公平鎖。

二,類UML圖

LinkedBlockingQueue 1.8 源碼詳解

三,基本成員
    static class Node<E> {
        // 我們插入的值
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        // 下一個(gè)node
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** 隊(duì)列容量 */
    private final int capacity;

    /** 兩個(gè)鎖,需要使用AtomicInteger保證原子性 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    // 頭結(jié)點(diǎn)
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    // 尾節(jié)點(diǎn)
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    /** take, poll, etc 的鎖 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    /** 等待在隊(duì)列空 */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    /** put, offer, etc的鎖 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    /** 等待在隊(duì)列滿 */
    private final Condition notFull = putLock.newCondition();
四,常用方法
構(gòu)造方法
    // 無參構(gòu)造
    public LinkedBlockingQueue() {
        // 默認(rèn)Integer.MAX_VALUE
        this(Integer.MAX_VALUE);
    }
    // 有參構(gòu)造
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // 創(chuàng)建一個(gè)item為null的節(jié)點(diǎn)
        last = head = new Node<E>(null);
    }
offer 方法
public boolean offer(E e) {
        // e不能為null
        if (e == null) throw new NullPointerException();
        // 總數(shù)
        final AtomicInteger count = this.count;
        // 總數(shù)等于了容量 返回false
        if (count.get() == capacity)
            return false;
        int c = -1;
        // 創(chuàng)建一個(gè)node
        Node<E> node = new Node<E>(e);
        // 獲取鎖
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                // 插入鏈表
                enqueue(node);
                // 加1返回舊值
                c = count.getAndIncrement();
                // c是增加之前的值,然后加1,再判斷有沒有可以存儲(chǔ)的容量
                if (c + 1 < capacity)
                    // 有喚醒下一個(gè)線程
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        // 隊(duì)列有一個(gè)元素了,證明之前隊(duì)列為空,可能已經(jīng)有元素來消費(fèi)了,所以就需要喚醒一個(gè)等待消費(fèi)的線程
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

注意:offer 還有一個(gè)重載方法,支持中斷,帶有超時(shí)時(shí)間的限制offer(E e, long timeout, TimeUnit unit)。

put 方法
    public void put(E e) throws InterruptedException {
        // 不可以為null
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        // 構(gòu)建一個(gè)節(jié)點(diǎn)
        Node<E> node = new Node<E>(e);
        // 獲取put鎖
        final ReentrantLock putLock = this.putLock;
        // 獲取count
        final AtomicInteger count = this.count;
        // 調(diào)用獲取鎖的方法,支持中斷
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            // 等于了隊(duì)列的容量
            while (count.get() == capacity) {
                // 進(jìn)入阻塞隊(duì)列
                notFull.await();
            }
            // 入隊(duì)
            enqueue(node);
            // 返回的是自增前的值
            c = count.getAndIncrement();
            // 如果這個(gè)元素入隊(duì)以后,還有多于的空間,喚醒等待隊(duì)列的線程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // c==0,證明之前隊(duì)列是空的,喚醒一個(gè)獲取線程
        if (c == 0)
            signalNotEmpty();
    }
poll 方法

這次我們看個(gè)帶超時(shí)時(shí)間的poll方法。

    // 帶超時(shí)時(shí)間的消費(fèi)一個(gè)元素
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 支持中斷的獲取鎖
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            // count-- 返回舊值
            c = count.getAndDecrement();
            // 還有元素,喚醒一個(gè)等待獲取的線程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // 隊(duì)列還有一個(gè)位置,喚醒一個(gè)入隊(duì)線程
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC // 自引用
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
take 方法
    // 獲取元素
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 隊(duì)列為null 就阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // 隊(duì)列消費(fèi)一個(gè)元素,可以喚醒一個(gè)生產(chǎn)線程了
        if (c == capacity)
            signalNotFull();
        return x;
    }
peek 方法
    // 獲取第一個(gè)元素
    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
size 方法
    public int size() {
        return count.get();
    }
五,總結(jié)

LinkedBlockingQueue 可以看做是一個(gè)×××隊(duì)列,因?yàn)樽畲笕萘渴荌nteger.MAX_VALUE,這已經(jīng)很大了,所以使用時(shí)一定注意容量問題,避免內(nèi)存溢出,但是好處就是可以不用我們?nèi)コ跏既萘浚魂?duì)列在入隊(duì)和出隊(duì)使用了兩把鎖,提高了并發(fā)性,相對(duì)于一把鎖來說;我們可以發(fā)現(xiàn)隊(duì)列的底層數(shù)據(jù)結(jié)構(gòu)采用的是鏈表,對(duì)比ArrayBlockingQueue的數(shù)組數(shù)據(jù)結(jié)構(gòu),在處理數(shù)據(jù)的同時(shí),節(jié)點(diǎn)本身也需要處理垃圾回收,所以相對(duì)于數(shù)組來的數(shù)據(jù)來說增加了垃圾回收,可能影響性能;LinkedBlockingQueue 和ArrayBlockingQueue 兩個(gè)可以對(duì)比學(xué)習(xí),追求系統(tǒng)穩(wěn)定性,性能就使用ArrayBlockingQueue ,追求并發(fā)性,可能發(fā)生大量請(qǐng)求時(shí)(系統(tǒng)不是很穩(wěn)定)要注意內(nèi)存溢出就使用LinkedBlockingQueue ,使用場(chǎng)景屬于個(gè)人理解,歡迎指正。

《參考 Java 并發(fā)編程的藝術(shù)》

文章標(biāo)題:LinkedBlockingQueue1.8源碼詳解
標(biāo)題路徑:http://muchs.cn/article16/ghscdg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供搜索引擎優(yōu)化網(wǎng)站設(shè)計(jì)、品牌網(wǎng)站建設(shè)移動(dòng)網(wǎng)站建設(shè)、網(wǎng)站建設(shè)、品牌網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

小程序開發(fā)