一次聊天引發(fā)的思考--java并發(fā)包實戰(zhàn)

一次聊天,談到了死鎖的解決、可重入鎖等等,突然發(fā)現(xiàn)這些離自己很遠,只有一些讀書時的概念涌入腦海,但各自的應用場景怎么都無法想出。痛定思痛,決定看看concurrent包里涉及并發(fā)的類及各自的應用場景。

創(chuàng)新互聯(lián)憑借專業(yè)的設計團隊扎實的技術支持、優(yōu)質高效的服務意識和豐厚的資源優(yōu)勢,提供專業(yè)的網(wǎng)站策劃、成都做網(wǎng)站、成都網(wǎng)站設計、成都外貿網(wǎng)站建設、網(wǎng)站優(yōu)化、軟件開發(fā)、網(wǎng)站改版等服務,在成都10余年的網(wǎng)站建設設計經(jīng)驗,為成都近千家中小型企業(yè)策劃設計了網(wǎng)站。

第一類:原子操作類的atomic包,里面包含了

1)布爾類型的AtomicBoolean

2)整型AtomicInteger、AtomicIntegerArray、AtomicIntegerFieldUpdater

3)長整型AtomicLong、AtomicLongArray、AtomicLongFieldUpdater

4)引用型AtomicMarkableReference、AtomicReference、AtomicReferenceArray、AtomicReferenceFieldUpdater、AtomicStampedReference

5)累加器DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64

java.util.concurrent.atomic原子操作類包

這個包里面提供了一組原子變量類。其基本的特性就是在多線程環(huán)境下,當有多個線程同時執(zhí)行這些類的實例包含的方法時,具有排他性,即當某個線程進入方法,執(zhí)行其中的指令時,不會被其他線程打斷,而別的線程就像自旋鎖一樣,一直等到該方法執(zhí)行完成,才由JVM從等待隊列中選擇一個另一個線程進入,這只是一種邏輯上的理解。實際上是借助硬件的相關指令來實現(xiàn)的,不會阻塞線程(或者說只是在硬件級別上阻塞了)。可以對基本數(shù)據(jù)、數(shù)組中的基本數(shù)據(jù)、對類中的基本數(shù)據(jù)進行操作。原子變量類相當于一種泛化的volatile變量,能夠支持原子的和有條件的讀-改-寫操作。

java.util.concurrent.atomic中的類可以分成4組:

標量類(Scalar):AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
數(shù)組類:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
更新器類:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
復合變量類:AtomicMarkableReference,AtomicStampedReference
第一組AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference這四種基本類型用來處理布爾,整數(shù),長整數(shù),對象四種數(shù)據(jù),其內部實現(xiàn)不是簡單的使用synchronized,而是一個更為高效的方式CAS (compare and swap) + volatile和native方法,從而避免了synchronized的高開銷,執(zhí)行效率大為提升。如AtomicInteger的實現(xiàn)片斷為:

private static final Unsafe unsafe = Unsafe.getUnsafe();
private volatile int value;
public final int get() {
        return value;
}
public final void set(int newValue) {
        value = newValue;
}
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

構造函數(shù)(兩個構造函數(shù))
默認的構造函數(shù):初始化的數(shù)據(jù)分別是false,0,0,null
帶參構造函數(shù):參數(shù)為初始化的數(shù)據(jù)
set( )和get( )方法:可以原子地設定和獲取atomic的數(shù)據(jù)。類似于volatile,保證數(shù)據(jù)會在主存中設置或讀取
void set()和void lazySet():set設置為給定值,直接修改原始值;lazySet延時設置變量值,這個等價于set()方法,但是由于字段是volatile類型的,因此次字段的修改會比普通字段(非volatile字段)有稍微的性能延時(盡管可以忽略),所以如果不是想立即讀取設置的新值,允許在“后臺”修改值,那么此方法就很有用。
getAndSet( )方法
原子的將變量設定為新數(shù)據(jù),同時返回先前的舊數(shù)據(jù)
其本質是get( )操作,然后做set( )操作。盡管這2個操作都是atomic,但是他們合并在一起的時候,就不是atomic。在Java的源程序的級別上,如果不依賴synchronized的機制來完成這個工作,是不可能的。只有依靠native方法才可以。

    public final int getAndSet(int newValue) {
        for (;;) {
            int current = get();
            if (compareAndSet(current, newValue))
                return current;
        }
    }

compareAndSet( ) 和weakCompareAndSet( )方法
這 兩個方法都是conditional modifier方法。這2個方法接受2個參數(shù),一個是期望數(shù)據(jù)(expected),一個是新數(shù)據(jù)(new);如果atomic里面的數(shù)據(jù)和期望數(shù)據(jù)一 致,則將新數(shù)據(jù)設定給atomic的數(shù)據(jù),返回true,表明成功;否則就不設定,并返回false。JSR規(guī)范中說:以原子方式讀取和有條件地寫入變量但不 創(chuàng)建任何 happen-before 排序,因此不提供與除 weakCompareAndSet 目標外任何變量以前或后續(xù)讀取或寫入操作有關的任何保證。大意就是說調用weakCompareAndSet時并不能保證不存在happen- before的發(fā)生(也就是可能存在指令重排序導致此操作失?。5菑腏ava源碼來看,其實此方法并沒有實現(xiàn)JSR規(guī)范的要求,最后效果和 compareAndSet是等效的,都調用了unsafe.compareAndSwapInt()完成操作。

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final boolean weakCompareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

對于 AtomicInteger、AtomicLong還提供了一些特別的方法。
getAndIncrement( ):以原子方式將當前值加 1,相當于線程安全的i++操作。
incrementAndGet( ):以原子方式將當前值加 1, 相當于線程安全的++i操作。
getAndDecrement( ):以原子方式將當前值減 1, 相當于線程安全的i--操作。
decrementAndGet ( ):以原子方式將當前值減 1,相當于線程安全的--i操作。
addAndGet( ): 以原子方式將給定值與當前值相加, 實際上就是等于線程安全的i =i+delta操作。
getAndAdd( ):以原子方式將給定值與當前值相加, 相當于線程安全的t=i;i+=delta;return t;操作。
以實現(xiàn)一些加法,減法原子操作。(注意 --i、++i不是原子操作,其中包含有3個操作步驟:第一步,讀取i;第二步,加1或減1;第三步:寫回內存)

使用AtomicReference創(chuàng)建線程安全的堆棧

package thread;
import java.util.concurrent.atomic.AtomicReference;
public class ConcurrentStack<T> {
    private AtomicReference<Node<T>>    stacks    = new AtomicReference<Node<T>>();
    public T push(T e) {
        Node<T> oldNode, newNode;
        for (;;) { // 這里的處理非常的特別,也是必須如此的。
            oldNode = stacks.get();
            newNode = new Node<T>(e, oldNode);
            if (stacks.compareAndSet(oldNode, newNode)) {
                return e;
            }
        }
    }    
    public T pop() {
        Node<T> oldNode, newNode;
        for (;;) {
            oldNode = stacks.get();
            newNode = oldNode.next;
            if (stacks.compareAndSet(oldNode, newNode)) {
                return oldNode.object;
            }
        }
    }    
    private static final class Node<T> {
        private T        object;        
        private Node<T>    next;        
        private Node(T object, Node<T> next) {
            this.object = object;
            this.next = next;
        }
    }    
}

雖然原子的標量類擴展了Number類,但并沒有擴展一些基本類型的包裝類,如Integer或Long,事實上他們也不能擴展:基本類型的包裝類是不可以修改的,而原子變量類是可以修改的。在原子變量類中沒有重新定義hashCode或equals方法,每個實例都是不同的,他們也不宜用做基于散列容器中的鍵值。

第二組AtomicIntegerArray,AtomicLongArray還有AtomicReferenceArray類進一步擴展了原子操作,對這些類型的數(shù)組提供了支持。這些類在為其數(shù)組元素提供 volatile 訪問語義方面也引人注目,這對于普通數(shù)組來說是不受支持的。

他們內部并不是像AtomicInteger一樣維持一個valatile變量,而是全部由native方法實現(xiàn),如下
AtomicIntegerArray的實現(xiàn)片斷:

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int scale = unsafe.arrayIndexScale(int[].class);
private final int[] array;
public final int get(int i) {
        return unsafe.getIntVolatile(array, rawIndex(i));
}
public final void set(int i, int newValue) {
        unsafe.putIntVolatile(array, rawIndex(i), newValue);
}

對Java技術,架構技術感興趣的同學,歡迎加群,一起學習,相互討論。可以獲取免費的學習資料,群號:614478470 點擊加入

第三組AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater基于反射的實用工具,可以對指定類的指定 volatile 字段進行原子更新。API非常簡單,但是也是有一些約束:

(1)字段必須是volatile類型的

(2)字段的描述類型(修飾符public/protected/default/private)是與調用者與操作對象字段的關系一致。也就是說 調用者能夠直接操作對象字段,那么就可以反射進行原子操作。但是對于父類的字段,子類是不能直接操作的,盡管子類可以訪問父類的字段。

(3)只能是實例變量,不能是類變量,也就是說不能加static關鍵字。

(4)只能是可修改變量,不能使final變量,因為final的語義就是不可修改。實際上final的語義和volatile是有沖突的,這兩個關鍵字不能同時存在。

(5)對于AtomicIntegerFieldUpdater 和AtomicLongFieldUpdater 只能修改int/long類型的字段,不能修改其包裝類型(Integer/Long)。如果要修改包裝類型就需要使用AtomicReferenceFieldUpdater 。

netty5.0中類ChannelOutboundBuffer統(tǒng)計發(fā)送的字節(jié)總數(shù),由于使用volatile變量已經(jīng)不能滿足,所以使用AtomicIntegerFieldUpdater 來實現(xiàn)的,看下面代碼:

//定義
    private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
            AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");

    private volatile long totalPendingSize;

//使用
        long oldValue = totalPendingSize;
        long newWriteBufferSize = oldValue + size;
        while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {
            oldValue = totalPendingSize;
            newWriteBufferSize = oldValue + size;
        }

第二類:鎖的類包,里面包含了

排他鎖:AbstractOwnableSynchronizer、AbstractQueuedLongSynchronizer、AbstractQueuedSynchronizer

讀寫鎖、可重入鎖:ReadWriteLock、ReentrantLock、Lock、ReentrantReadWriteLock(隱式包含讀鎖和寫鎖)、Condition、LockSupport

混合鎖:StampedLock

condition相似于對象的監(jiān)控方法object#wait()、object#notify、object#notifyAll,但不同之處在于:通過和任意Lock的實現(xiàn)類聯(lián)合使用,Condition對每個對象提供了多個等待-設置功能。

此時Lock代替了synchronized方法和模塊,condition代替了對象的監(jiān)控方法。

Condition通常也稱作Condition隊列或者condition變量,它提供了一種方法,使一個線程能夠暫停執(zhí)行(wait方法),當別的線程的狀態(tài)condition為true時可以激活此線程。由于不同線程共享的狀態(tài)信息必須受到保護,因此Condition具有一些鎖的形式。等待一個condition的關鍵屬性是自動釋放關聯(lián)的鎖并且暫停當前線程,類似于object.wait。

Conditon示例內部綁定了一個鎖,獲取一個特定鎖的實例的Condition實例可以通過lock#newCondition方法得到。

舉個condition的示例,先看一下生產者和消費者模式常規(guī)代碼:

/** 
         * 生產指定數(shù)量的產品 
         * 
         * @param neednum 
         */ 
        public synchronized void produce(int neednum) { 
                //測試是否需要生產 
                while (neednum + curnum > max_size) { 
                        System.out.println("要生產的產品數(shù)量" + neednum + "超過剩余庫存量" + (max_size - curnum) + ",暫時不能執(zhí)行生產任務!"); 
                        try { 
                                //當前的生產線程等待 
                                wait(); 
                        } catch (InterruptedException e) { 
                                e.printStackTrace(); 
                        } 
                } 
                //滿足生產條件,則進行生產,這里簡單的更改當前庫存量 
                curnum += neednum; 
                System.out.println("已經(jīng)生產了" + neednum + "個產品,現(xiàn)倉儲量為" + curnum); 
                //喚醒在此對象監(jiān)視器上等待的所有線程 
                notifyAll(); 
        } 

        /** 
         * 消費指定數(shù)量的產品 
         * 
         * @param neednum 
         */ 
        public synchronized void consume(int neednum) { 
                //測試是否可消費 
                while (curnum < neednum) { 
                        try { 
                                //當前的生產線程等待 
                                wait(); 
                        } catch (InterruptedException e) { 
                                e.printStackTrace(); 
                        } 
                } 
                //滿足消費條件,則進行消費,這里簡單的更改當前庫存量 
                curnum -= neednum; 
                System.out.println("已經(jīng)消費了" + neednum + "個產品,現(xiàn)倉儲量為" + curnum); 
                //喚醒在此對象監(jiān)視器上等待的所有線程 
                notifyAll(); 
        } 

我們希望保證生產者的produce線程和消費者的consume線程有不同的等待--設置,這樣,當buffer中的項目或者空間可用時,我們只需要每次只通知一個單線程就可以了。優(yōu)化只要使用兩個Condition實例即可(略有改動):

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void produce(Object x) throws InterruptedException {
      lock.lock();
      try {
        while (count == items.length)
          notFull.await();
        items[putptr] = x;
        if (++putptr == items.length) putptr = 0;
        ++count;
        notEmpty.signal();
      } finally {
        lock.unlock();
      }
    }

    public Object consume() throws InterruptedException {
      lock.lock();
      try {
        while (count == 0)
          notEmpty.await();
        Object x = items[takeptr];
        if (++takeptr == items.length) takeptr = 0;
        --count;
        notFull.signal();
        return x;
      } finally {
        lock.unlock();
      }
    }
  }

java.util.concurrent.ArrayBlockingQueue提供了上述功能,因此沒有必要實現(xiàn)這個示例的類。

Condition實現(xiàn)類可以提供和對象監(jiān)控方法不同的行為和語義,例如保證通知的順序,或者當執(zhí)行通知時不要求保持一個鎖。若condition實現(xiàn)類提供了上述特定的語義,那么實現(xiàn)類必須以文檔的形式聲明這些語義。

ReentrantReadWriteLock的讀鎖與寫鎖

讀鎖是排寫鎖操作的,讀鎖不排讀鎖操作,多個讀鎖可以并發(fā)不阻塞。在讀鎖獲取和讀鎖釋放之前,寫鎖并不能被任何線程獲取。多個讀鎖同時作用期間,試圖獲取寫鎖的線程都處于等待狀態(tài),當最后一個讀鎖釋放后,試圖獲取寫鎖的線程才有機會獲取寫鎖。
寫鎖是排寫鎖,排讀鎖操作的。當一個線程獲取到寫鎖之后,其他試圖獲取寫鎖和試圖獲取讀鎖的線程都處于等待狀態(tài),直到寫鎖被釋放。
同時,寫鎖中是可以獲取讀鎖,但是讀鎖中是無法獲取寫鎖的。

下面的是java的ReentrantReadWriteLock官方示例,來解讀一下吧。

class CachedData {
    Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
  rwl.readLock().lock();// @1
  if (!cacheValid) {
    // Must release read lock before acquiring write lock
    rwl.readLock().unlock(); // @3
    rwl.writeLock().lock(); // @2
    try {
      // Recheck state because another thread might have
      // acquired write lock and changed state before we did.
      if (!cacheValid) {
        data = ...
        cacheValid = true;
      }
      // Downgrade by acquiring read lock before releasing write lock
      rwl.readLock().lock(); //@4
    } finally {
      rwl.writeLock().unlock(); // Unlock write, still hold read @5
    }
  }

  try {
    use(data);
  } finally {
    rwl.readLock().unlock(); // 6
  }
}

當ABC三個線程同時進入到processcachedData()方法,同時都會得到讀鎖,然后獲取cachevalid,然后走到3位置釋放讀鎖,同時,假設A線程獲取到寫鎖,所以BC線程就無法獲取到寫鎖,這個時候進來的D線程就會停留在1位置而無法獲取讀鎖。A線程繼續(xù)往下走,判斷到cachevalid還是false,就會繼續(xù)走下去。為什么這個地方會還有一次判斷,上面注釋很清楚,A線程寫完之后,BC線程獲取到寫鎖,如果不再次進行判斷,就會寫入新的數(shù)據(jù)了,就不再是同步鎖了。所以這個地方有一個新的判斷?;氐紸線程,A線程繼續(xù)進行操作,到達4之后,獲取到讀鎖,這個地方api官方解釋就是,寫鎖要釋放的時候,必須先降級成讀鎖,這樣其他在等待寫鎖的比如BC,就不會獲取到寫鎖了。然后釋放寫鎖,這就是寫鎖的降級,釋放寫鎖之后,因為還持有讀鎖,所以BC線程無法獲取到寫鎖,只有在A線程執(zhí)行到6的時候,BC線程才會拿到寫鎖,進行判斷,就會發(fā)現(xiàn)數(shù)據(jù)已經(jīng)有了,釋放寫鎖,釋放讀鎖。

讀寫鎖能夠有效的在讀操作明顯大于寫操作的需求中完成高效率的運轉。

第三類:并發(fā)數(shù)據(jù)結構,包含了array、linkedList、set、map、list、queue等并發(fā)數(shù)據(jù)結構,包含如下:

阻塞數(shù)據(jù)結構:ArrayBlockingQueue、BlockingDeque、BlockingQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、

并發(fā)數(shù)據(jù)結構:ConcurrentHashMap、ConcurrentLinkedDeque、ConcurrentLinkedQueue、ConcurrentMap、ConcurrentNavigableMap、ConcurrentSkipListMap、ConcurrentSkipListSet

第四類:同步器 ,這部分主要是對線程集合的管理的實現(xiàn),有Semaphore,CyclicBarrier, CountDownLatch,Exchanger等一些類。

對Java技術,架構技術感興趣的同學,歡迎加群,一起學習,相互討論??梢垣@取免費的學習資料,群號:614478470 點擊加入

Semaphore
類 java.util.concurrent.Semaphore 提供了一個計數(shù)信號量,從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然后再獲取該許可。每個 release()添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore只對可用許可的號碼進行計數(shù),并采取相應的行動。
Semaphore 通常用于限制可以訪問某些資源(物理或邏輯的)的線程數(shù)目。 示例如下:

import java.util.*;import java.util.concurrent.*;

public class SemApp
{
    public static void main(String[] args)
{
        Runnable limitedCall = new Runnable() {
            final Random rand = new Random();
            final Semaphore available = new Semaphore(3);
            int count = 0;
            public void run()
{
                int time = rand.nextInt(15);
                int num = count++;

                try
                {
                    available.acquire();

                    System.out.println("Executing " + 
                        "long-running action for " + 
                        time + " seconds... #" + num);

                    Thread.sleep(time * 1000);

                    System.out.println("Done with #" + 
                        num + "!");

                    available.release();
                }
                catch (InterruptedException intEx)
                {
                    intEx.printStackTrace();
                }
            }
        };

        for (int i=0; i<10; i++)
            new Thread(limitedCall).start();
    }
}

即使本例中的 10 個線程都在運行(您可以對運行 SemApp 的 Java 進程執(zhí)行 jstack 來驗證),但只有 3 個線程是活躍的。在一個信號計數(shù)器釋放之前,其他 7 個線程都處于空閑狀態(tài)。(實際上,Semaphore 類支持一次獲取和釋放多個 permit,但這不適用于本場景。)

CyclicBarrier
java.util.concurrent.CyclicBarrier 一個同步輔助類,它允許 (common barrier point),在在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。一組線程互相等待,直到到達某個公共屏障點。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環(huán)的 barrier。
需要所有的子任務都完成時,才執(zhí)行主任務,這個時候就可以選擇使用CyclicBarrier。賽跑時,等待所有人都準備好時,才起跑:

public class CyclicBarrierTest {

    public static void main(String[] args) throws IOException, InterruptedException {
        //如果將參數(shù)改為4,但是下面只加入了3個選手,這永遠等待下去
        //Waits until all parties have invoked await on this barrier. 
        CyclicBarrier barrier = new CyclicBarrier(3);

        ExecutorService executor = Executors.newFixedThreadPool(3);
        executor.submit(new Thread(new Runner(barrier, "1號選手")));
        executor.submit(new Thread(new Runner(barrier, "2號選手")));
        executor.submit(new Thread(new Runner(barrier, "3號選手")));

        executor.shutdown();
    }
}

class Runner implements Runnable {
    // 一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)
    private CyclicBarrier barrier;

    private String name;

    public Runner(CyclicBarrier barrier, String name) {
        super();
        this.barrier = barrier;
        this.name = name;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000 * (new Random()).nextInt(8));
            System.out.println(name + " 準備好了...");
            // barrier的await方法,在所有參與者都已經(jīng)在此 barrier 上調用 await 方法之前,將一直等待。
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(name + " 起跑!");
    }
}

輸出結果:
3號選手 準備好了...
2號選手 準備好了...
1號選手 準備好了...
1號選手 起跑!
2號選手 起跑!
3號選手 起跑!

CountDownLatch

類 java.util.concurrent.CountDownLatch 是一個同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待。 用給定的數(shù)字作為計數(shù)器初始化 CountDownLatch。一個線程調用 await()方法后,在當前計數(shù)到達零之前,會一直受阻塞。其他線程調用 countDown() 方法,會使計數(shù)器遞減,所以,計數(shù)器的值為 0 后,會釋放所有等待的線程。其他后續(xù)的 await 調用都將立即返回。
這種現(xiàn)象只出現(xiàn)一次,因為計數(shù)無法被重置。如果需要重置計數(shù),請考慮使用 CyclicBarrier。

CountDownLatch 作為一個通用同步工具,有很多用途。使用“ 1 ”初始化的
CountDownLatch 用作一個簡單的開/關鎖存器,或入口:在通過調用 countDown() 的線程
打開入口前,所有調用 await 的線程都一直在入口處等待。用 N 初始化的 CountDownLatch
可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。

此類持有所有空閑線程,直到滿足特定條件,這時它將會一次釋放所有這些線程。

清單 2. CountDownLatch:讓我們去賽馬吧!

import java.util.*;
import java.util.concurrent.*;

class Race
{
    private Random rand = new Random();

    private int distance = rand.nextInt(250);
    private CountDownLatch start;
    private CountDownLatch finish;

    private List<String> horses = new ArrayList<String>();

    public Race(String... names)
    {
        this.horses.addAll(Arrays.asList(names));
    }

    public void run()
        throws InterruptedException
    {
        System.out.println("And the horses are stepping up to the gate...");
        final CountDownLatch start = new CountDownLatch(1);
        final CountDownLatch finish = new CountDownLatch(horses.size());
        final List<String> places = 
            Collections.synchronizedList(new ArrayList<String>());

        for (final String h : horses)
        {
            new Thread(new Runnable() {
                public void run() {
                    try
                    {
                        System.out.println(h + 
                            " stepping up to the gate...");
                        start.await();

                        int traveled = 0;
                        while (traveled < distance)
                        {
                            // In a 0-2 second period of time....
                            Thread.sleep(rand.nextInt(3) * 1000);

                            // ... a horse travels 0-14 lengths
                            traveled += rand.nextInt(15);
                            System.out.println(h + 
                                " advanced to " + traveled + "!");
                        }
                        finish.countDown();
                        System.out.println(h + 
                            " crossed the finish!");
                        places.add(h);
                    }
                    catch (InterruptedException intEx)
                    {
                        System.out.println("ABORTING RACE!!!");
                        intEx.printStackTrace();
                    }
                }
            }).start();
        }

        System.out.println("And... they're off!");
        start.countDown();        

        finish.await();
        System.out.println("And we have our winners!");
        System.out.println(places.get(0) + " took the gold...");
        System.out.println(places.get(1) + " got the silver...");
        System.out.println("and " + places.get(2) + " took home the bronze.");
    }
}

public class CDLApp
{
    public static void main(String[] args)
        throws InterruptedException, java.io.IOException
    {
        System.out.println("Prepping...");

        Race r = new Race(
            "Beverly Takes a Bath",
            "RockerHorse",
            "Phineas",
            "Ferb",
            "Tin Cup",
            "I'm Faster Than a Monkey",
            "Glue Factory Reject"
            );

        System.out.println("It's a race of " + r.getDistance() + " lengths");

        System.out.println("Press Enter to run the race....");
        System.in.read();

        r.run();
    }
}

注意,CountDownLatch 有兩個用途:首先,它同時釋放所有線程,模擬馬賽的起點,但隨后會設置一個門閂模擬馬賽的終點。這樣,“主” 線程就可以輸出結果。 為了讓馬賽有更多的輸出注釋,可以在賽場的 “轉彎處” 和 “半程” 點,比如賽馬跨過跑道的四分之一、二分之一和四分之三線時,添加 CountDownLatch。

Exchanger
類 java.util.concurrent.Exchanger 提供了一個同步點,在這個同步點,一對線程可以交換
數(shù)據(jù)。每個線程通過 exchange()方法的入口提供數(shù)據(jù)給他的伙伴線程,并接收他的伙伴線程
提供的數(shù)據(jù),并返回。

線程間可以用 Exchanger 來交換數(shù)據(jù)。當兩個線程通過 Exchanger 交互了對象,這個交換對于兩個線程來說都是安全的。

Future 和 FutureTask
接口 public interface Future<V> 表示異步計算的結果。它提供了檢查計算是否完成的方法,
以等待計算的完成,并調用get()獲取計算的結果。
FutureTask 類是 Future 的一個實現(xiàn), 并實現(xiàn)了Runnable ,所以可通過 Executor(線程池) 來執(zhí)行。
也可傳遞給Thread對象執(zhí)行。

如果在主線程中需要執(zhí)行比較耗時的操作時,但又不想阻塞主線程時,可以把這些作業(yè)交給
Future 對象在后臺完成,當主線程將來需要時,就可以通過 Future 對象獲得后臺作業(yè)的計算結果或者執(zhí)行狀態(tài)。

第五類:線程管理,

Callable 被執(zhí)行的任務
Executor 執(zhí)行任務
Future 異步提交任務的返回數(shù)據(jù)
一次聊天引發(fā)的思考--java并發(fā)包實戰(zhàn)
Executor是總的接口,用來執(zhí)行Runnable任務;
ExecutorService是Executor的擴展接口,主要擴展了執(zhí)行Runnable或Callable任務的方式,及shutdown的方法;
ScheduledExecutorService是ExecutorService的擴展接口,主要擴展了可以用任務調度的形式(延遲或定期)執(zhí)行Runnable或Callable任務;
AbstractExecutorService是ExecutorService接口的實現(xiàn)類,是抽象類,提供一些默認的執(zhí)行Runnable或Callable任務的方法;
ThreadPoolExecutor是AbstractExecutorService的子類,是線程池的實現(xiàn);
ScheduledThreadPoolExecutor是ThreadPoolExecutor的子類,實現(xiàn)ScheduledExecutorService接口,基于線程池模式的多任務調度,是Timer工具類的高性能版;
Callable與Future是Runnable的另外的形式,用來異步獲取任務執(zhí)行結果;
最后,Executors是工具類,用于創(chuàng)建上述各種實例。

Q&A
Synchronization vs volatile

Synchronization supports mutual exclusion and visibility. In contrast, the volatile keyword only supports visibility.

對Java技術,架構技術感興趣的同學,歡迎加群,一起學習,相互討論。可以獲取免費的學習資料,群號:614478470 點擊加入

網(wǎng)站名稱:一次聊天引發(fā)的思考--java并發(fā)包實戰(zhàn)
當前路徑:http://muchs.cn/article44/pjjcee.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供Google、App設計、商城網(wǎng)站移動網(wǎng)站建設、ChatGPT、響應式網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站建設網(wǎng)站維護公司