Java并發(fā)系列之AbstractQueuedSynchronizer源碼分析(共享模式)

通過上一篇的分析,我們知道了獨(dú)占模式獲取鎖有三種方式,分別是不響應(yīng)線程中斷獲取,響應(yīng)線程中斷獲取,設(shè)置超時(shí)時(shí)間獲取。在共享模式下獲取鎖的方式也是這三種,而且基本上都是大同小異,我們搞清楚了一種就能很快的理解其他的方式。雖然說AbstractQueuedSynchronizer源碼有一千多行,但是重復(fù)的也比較多,所以讀者不要?jiǎng)傞_始的時(shí)候被嚇到,只要耐著性子去看慢慢的自然能夠漸漸領(lǐng)悟。就我個(gè)人經(jīng)驗(yàn)來說,閱讀AbstractQueuedSynchronizer源碼有幾個(gè)比較關(guān)鍵的地方需要弄明白,分別是獨(dú)占模式和共享模式的區(qū)別,結(jié)點(diǎn)的等待狀態(tài),以及對(duì)條件隊(duì)列的理解。理解了這些要點(diǎn)那么后續(xù)源碼的閱讀將會(huì)輕松很多。當(dāng)然這些在我的《Java并發(fā)系列[1]----AbstractQueuedSynchronizer源碼分析之概要分析》這篇文章里都有詳細(xì)的介紹,讀者可以先去查閱。本篇對(duì)于共享模式的分析也是分為三種獲取鎖的方式和一種釋放鎖的方式。

站在用戶的角度思考問題,與客戶深入溝通,找到南溪網(wǎng)站設(shè)計(jì)與南溪網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類型包括:網(wǎng)站制作、網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、國際域名空間、雅安服務(wù)器托管、企業(yè)郵箱。業(yè)務(wù)覆蓋南溪地區(qū)。

1. 不響應(yīng)線程中斷的獲取

//以不可中斷模式獲取鎖(共享模式)
public final void acquireShared(int arg) {
  //1.嘗試去獲取鎖
  if (tryAcquireShared(arg) < 0) {
    //2.如果獲取失敗就進(jìn)入這個(gè)方法
    doAcquireShared(arg);
  }
}

//嘗試去獲取鎖(共享模式)
//負(fù)數(shù):表示獲取失敗
//零值:表示當(dāng)前結(jié)點(diǎn)獲取成功, 但是后繼結(jié)點(diǎn)不能再獲取了
//正數(shù):表示當(dāng)前結(jié)點(diǎn)獲取成功, 并且后繼結(jié)點(diǎn)同樣可以獲取成功
protected int tryAcquireShared(int arg) {
  throw new UnsupportedOperationException();
}

調(diào)用acquireShared方法是不響應(yīng)線程中斷獲取鎖的方式。在該方法中,首先調(diào)用tryAcquireShared去嘗試獲取鎖,tryAcquireShared方法返回一個(gè)獲取鎖的狀態(tài),這里AQS規(guī)定了返回狀態(tài)若是負(fù)數(shù)代表當(dāng)前結(jié)點(diǎn)獲取鎖失敗,若是0代表當(dāng)前結(jié)點(diǎn)獲取鎖成功,但后繼結(jié)點(diǎn)不能再獲取了,若是正數(shù)則代表當(dāng)前結(jié)點(diǎn)獲取鎖成功,并且這個(gè)鎖后續(xù)結(jié)點(diǎn)也同樣可以獲取成功。子類在實(shí)現(xiàn)tryAcquireShared方法獲取鎖的邏輯時(shí),返回值需要遵守這個(gè)約定。如果調(diào)用tryAcquireShared的返回值小于0,就代表這次嘗試獲取鎖失敗了,接下來就調(diào)用doAcquireShared方法將當(dāng)前線程添加進(jìn)同步隊(duì)列。我們看到doAcquireShared方法。

//在同步隊(duì)列中獲取(共享模式)
private void doAcquireShared(int arg) {
  //添加到同步隊(duì)列中
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      //獲取當(dāng)前結(jié)點(diǎn)的前繼結(jié)點(diǎn)
      final Node p = node.predecessor();
      //如果前繼結(jié)點(diǎn)為head結(jié)點(diǎn)就再次嘗試去獲取鎖
      if (p == head) {
        //再次嘗試去獲取鎖并返回獲取狀態(tài)
        //r < 0, 表示獲取失敗
        //r = 0, 表示當(dāng)前結(jié)點(diǎn)獲取成功, 但是后繼結(jié)點(diǎn)不能再獲取了
        //r > 0, 表示當(dāng)前結(jié)點(diǎn)獲取成功, 并且后繼結(jié)點(diǎn)同樣可以獲取成功
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          //到這里說明當(dāng)前結(jié)點(diǎn)已經(jīng)獲取鎖成功了, 此時(shí)它會(huì)將鎖的狀態(tài)信息傳播給后繼結(jié)點(diǎn)
          setHeadAndPropagate(node, r);
          p.next = null;
          //如果在線程阻塞期間收到中斷請(qǐng)求, 就在這一步響應(yīng)該請(qǐng)求
          if (interrupted) {
            selfInterrupt();
          }
          failed = false;
          return;
        }
      }
      //每次獲取鎖失敗后都會(huì)判斷是否可以將線程掛起, 如果可以的話就會(huì)在parkAndCheckInterrupt方法里將線程掛起
      if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
        interrupted = true;
      }
    }
  } finally {
    if (failed) {
      cancelAcquire(node);
    }
  }
}

進(jìn)入doAcquireShared方法首先是調(diào)用addWaiter方法將當(dāng)前線程包裝成結(jié)點(diǎn)放到同步隊(duì)列尾部。這個(gè)添加結(jié)點(diǎn)的過程我們?cè)谥v獨(dú)占模式時(shí)講過,這里就不再講了。結(jié)點(diǎn)進(jìn)入同步隊(duì)列后,如果它發(fā)現(xiàn)在它前面的結(jié)點(diǎn)就是head結(jié)點(diǎn),因?yàn)閔ead結(jié)點(diǎn)的線程已經(jīng)獲取鎖進(jìn)入房間里面了,那么下一個(gè)獲取鎖的結(jié)點(diǎn)就輪到自己了,所以當(dāng)前結(jié)點(diǎn)先不會(huì)將自己掛起,而是再一次去嘗試獲取鎖,如果前面那人剛好釋放鎖離開了,那么當(dāng)前結(jié)點(diǎn)就能成功獲得鎖,如果前面那人還沒有釋放鎖,那么就會(huì)調(diào)用shouldParkAfterFailedAcquire方法,在這個(gè)方法里面會(huì)將head結(jié)點(diǎn)的狀態(tài)改為SIGNAL,只有保證前面結(jié)點(diǎn)的狀態(tài)為SIGNAL,當(dāng)前結(jié)點(diǎn)才能放心的將自己掛起,所有線程都會(huì)在parkAndCheckInterrupt方法里面被掛起。如果當(dāng)前結(jié)點(diǎn)恰巧成功的獲取了鎖,那么接下來就會(huì)調(diào)用setHeadAndPropagate方法將自己設(shè)置為head結(jié)點(diǎn),并且喚醒后面同樣是共享模式的結(jié)點(diǎn)。下面我們看下setHeadAndPropagate方法具體的操作。

//設(shè)置head結(jié)點(diǎn)并傳播鎖的狀態(tài)(共享模式)
private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head;
  //將給定結(jié)點(diǎn)設(shè)置為head結(jié)點(diǎn)
  setHead(node);
  //如果propagate大于0表明鎖可以獲取了
  if (propagate > 0 || h == null || h.waitStatus < 0) {
    //獲取給定結(jié)點(diǎn)的后繼結(jié)點(diǎn)
    Node s = node.next;
    //如果給定結(jié)點(diǎn)的后繼結(jié)點(diǎn)為空, 或者它的狀態(tài)是共享狀態(tài)
    if (s == null || s.isShared()) {
      //喚醒后繼結(jié)點(diǎn)
      doReleaseShared();
    }
  }
}

//釋放鎖的操作(共享模式)
private void doReleaseShared() {
  for (;;) {
    //獲取同步隊(duì)列的head結(jié)點(diǎn)
    Node h = head;
    if (h != null && h != tail) {
      //獲取head結(jié)點(diǎn)的等待狀態(tài)
      int ws = h.waitStatus;
      //如果head結(jié)點(diǎn)的狀態(tài)為SIGNAL, 表明后面有人在排隊(duì)
      if (ws == Node.SIGNAL) {
        //先把head結(jié)點(diǎn)的等待狀態(tài)更新為0
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
          continue;
        }
        //再去喚醒后繼結(jié)點(diǎn)
        unparkSuccessor(h);
       //如果head結(jié)點(diǎn)的狀態(tài)為0, 表明此時(shí)后面沒人在排隊(duì), 就只是將head狀態(tài)修改為PROPAGATE
      }else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
        continue;
      }
    }
    //只有保證期間head結(jié)點(diǎn)沒被修改過才能跳出循環(huán)
    if (h == head) {
      break;
    }
  }
}

調(diào)用setHeadAndPropagate方法首先將自己設(shè)置成head結(jié)點(diǎn),然后再根據(jù)傳入的tryAcquireShared方法的返回值來決定是否要去喚醒后繼結(jié)點(diǎn)。前面已經(jīng)講到當(dāng)返回值大于0就表明當(dāng)前結(jié)點(diǎn)成功獲取了鎖,并且后面的結(jié)點(diǎn)也可以成功獲取鎖。這時(shí)當(dāng)前結(jié)點(diǎn)就需要去喚醒后面同樣是共享模式的結(jié)點(diǎn),注意,每次喚醒僅僅只是喚醒后一個(gè)結(jié)點(diǎn),如果后一個(gè)結(jié)點(diǎn)不是共享模式的話,當(dāng)前結(jié)點(diǎn)就直接進(jìn)入房間而不會(huì)再去喚醒更后面的結(jié)點(diǎn)了。共享模式下喚醒后繼結(jié)點(diǎn)的操作是在doReleaseShared方法進(jìn)行的,共享模式和獨(dú)占模式的喚醒操作基本也是相同的,都是去找到自己座位上的牌子(等待狀態(tài)),如果牌子上為SIGNAL表明后面有人需要讓它幫忙喚醒,如果牌子上為0則表明隊(duì)列此時(shí)并沒有人在排隊(duì)。在獨(dú)占模式下是如果發(fā)現(xiàn)沒人在排隊(duì)就直接離開隊(duì)列了,而在共享模式下如果發(fā)現(xiàn)隊(duì)列后面沒人在排隊(duì),當(dāng)前結(jié)點(diǎn)在離開前仍然會(huì)留個(gè)小紙條(將等待狀態(tài)設(shè)置為PROPAGATE)告訴后來的人這個(gè)鎖的可獲取狀態(tài)。那么后面來的人在嘗試獲取鎖的時(shí)候可以根據(jù)這個(gè)狀態(tài)來判斷是否直接獲取鎖。

2. 響應(yīng)線程中斷的獲取

//以可中斷模式獲取鎖(共享模式)
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  //首先判斷線程是否中斷, 如果是則拋出異常
  if (Thread.interrupted()) {
    throw new InterruptedException();
  }
  //1.嘗試去獲取鎖
  if (tryAcquireShared(arg) < 0) {
    //2. 如果獲取失敗則進(jìn)人該方法
    doAcquireSharedInterruptibly(arg);
  }
}

//以可中斷模式獲取(共享模式)
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  //將當(dāng)前結(jié)點(diǎn)插入同步隊(duì)列尾部
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    for (;;) {
      //獲取當(dāng)前結(jié)點(diǎn)的前繼結(jié)點(diǎn)
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          p.next = null;
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
        //如果線程在阻塞過程中收到過中斷請(qǐng)求, 那么就會(huì)立馬在這里拋出異常
        throw new InterruptedException();
      }
    }
  } finally {
    if (failed) {
      cancelAcquire(node);
    }
  }
}

響應(yīng)線程中斷獲取鎖的方式和不響應(yīng)線程中斷獲取鎖的方式在流程上基本是相同的,唯一的區(qū)別就是在哪里響應(yīng)線程的中斷請(qǐng)求。在不響應(yīng)線程中斷獲取鎖時(shí),線程從parkAndCheckInterrupt方法中被喚醒,喚醒后就立馬返回是否收到中斷請(qǐng)求,即使是收到了中斷請(qǐng)求也會(huì)繼續(xù)自旋直到獲取鎖后才響應(yīng)中斷請(qǐng)求將自己給掛起。而響應(yīng)線程中斷獲取鎖會(huì)才線程被喚醒后立馬響應(yīng)中斷請(qǐng)求,如果在阻塞過程中收到了線程中斷就會(huì)立馬拋出InterruptedException異常。

3. 設(shè)置超時(shí)時(shí)間的獲取

//以限定超時(shí)時(shí)間獲取鎖(共享模式)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
  if (Thread.interrupted()) {
    throw new InterruptedException();
  }
  //1.調(diào)用tryAcquireShared嘗試去獲取鎖
  //2.如果獲取失敗就調(diào)用doAcquireSharedNanos
  return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
}

//以限定超時(shí)時(shí)間獲取鎖(共享模式)
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
  long lastTime = System.nanoTime();
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    for (;;) {
      //獲取當(dāng)前結(jié)點(diǎn)的前繼結(jié)點(diǎn)
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          p.next = null;
          failed = false;
          return true;
        }
      }
      //如果超時(shí)時(shí)間用完了就結(jié)束獲取, 并返回失敗信息
      if (nanosTimeout <= 0) {
        return false;
      }
      //1.檢查是否滿足將線程掛起要求(保證前繼結(jié)點(diǎn)狀態(tài)為SIGNAL)
      //2.檢查超時(shí)時(shí)間是否大于自旋時(shí)間
      if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) {
        //若滿足上面兩個(gè)條件就將當(dāng)前線程掛起一段時(shí)間
        LockSupport.parkNanos(this, nanosTimeout);
      }
      long now = System.nanoTime();
      //超時(shí)時(shí)間每次減去獲取鎖的時(shí)間
      nanosTimeout -= now - lastTime;
      lastTime = now;
      //如果在阻塞時(shí)收到中斷請(qǐng)求就立馬拋出異常
      if (Thread.interrupted()) {
        throw new InterruptedException();
      }
    }
  } finally {
    if (failed) {
      cancelAcquire(node);
    }
  }
}

如果看懂了上面兩種獲取方式,再來看設(shè)置超時(shí)時(shí)間的獲取方式就會(huì)很輕松,基本流程都是一樣的,主要是理解超時(shí)的機(jī)制是怎樣的。如果第一次獲取鎖失敗會(huì)調(diào)用doAcquireSharedNanos方法并傳入超時(shí)時(shí)間,進(jìn)入方法后會(huì)根據(jù)情況再次去獲取鎖,如果再次獲取失敗就要考慮將線程掛起了。這時(shí)會(huì)判斷超時(shí)時(shí)間是否大于自旋時(shí)間,如果是的話就會(huì)將線程掛起一段時(shí)間,否則就繼續(xù)嘗試獲取,每次獲取鎖之后都會(huì)將超時(shí)時(shí)間減去獲取鎖的時(shí)間,一直這樣循環(huán)直到超時(shí)時(shí)間用盡,如果還沒有獲取到鎖的話就會(huì)結(jié)束獲取并返回獲取失敗標(biāo)識(shí)。在整個(gè)期間線程是響應(yīng)線程中斷的。

4. 共享模式下結(jié)點(diǎn)的出隊(duì)操作

//釋放鎖的操作(共享模式)
public final boolean releaseShared(int arg) {
  //1.嘗試去釋放鎖
  if (tryReleaseShared(arg)) {
    //2.如果釋放成功就喚醒其他線程
    doReleaseShared();
    return true;
  }
  return false;
}

//嘗試去釋放鎖(共享模式)
protected boolean tryReleaseShared(int arg) {
  throw new UnsupportedOperationException();
}

//釋放鎖的操作(共享模式)
private void doReleaseShared() {
  for (;;) {
    //獲取同步隊(duì)列的head結(jié)點(diǎn)
    Node h = head;
    if (h != null && h != tail) {
      //獲取head結(jié)點(diǎn)的等待狀態(tài)
      int ws = h.waitStatus;
      //如果head結(jié)點(diǎn)的狀態(tài)為SIGNAL, 表明后面有人在排隊(duì)
      if (ws == Node.SIGNAL) {
        //先把head結(jié)點(diǎn)的等待狀態(tài)更新為0
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
          continue;
        }
        //再去喚醒后繼結(jié)點(diǎn)
        unparkSuccessor(h);
       //如果head結(jié)點(diǎn)的狀態(tài)為0, 表明此時(shí)后面沒人在排隊(duì), 就只是將head狀態(tài)修改為PROPAGATE
      }else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
        continue;
      }
    }
    //只有保證期間head結(jié)點(diǎn)沒被修改過才能跳出循環(huán)
    if (h == head) {
      break;
    }
  }
}

線程在房間辦完事之后就會(huì)調(diào)用releaseShared方法釋放鎖,首先調(diào)用tryReleaseShared方法嘗試釋放鎖,該方法的判斷邏輯由子類實(shí)現(xiàn)。如果釋放成功就調(diào)用doReleaseShared方法去喚醒后繼結(jié)點(diǎn)。走出房間后它會(huì)找到原先的座位(head結(jié)點(diǎn)),看看座位上是否有人留了小紙條(狀態(tài)為SIGNAL),如果有就去喚醒后繼結(jié)點(diǎn)。如果沒有(狀態(tài)為0)就代表隊(duì)列沒人在排隊(duì),那么在離開之前它還要做最后一件事情,就是在自己座位上留下小紙條(狀態(tài)設(shè)置為PROPAGATE),告訴后面的人鎖的獲取狀態(tài),整個(gè)釋放鎖的過程和獨(dú)占模式唯一的區(qū)別就是在這最后一步操作。

注:以上全部分析基于JDK1.7,不同版本間會(huì)有差異,讀者需要注意。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。

文章題目:Java并發(fā)系列之AbstractQueuedSynchronizer源碼分析(共享模式)
本文地址:http://muchs.cn/article0/gdecio.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站收錄、網(wǎng)站建設(shè)、手機(jī)網(wǎng)站建設(shè)、用戶體驗(yàn)、電子商務(wù)、軟件開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

營銷型網(wǎng)站建設(shè)