forkjoin框架怎么在java中使用-創(chuàng)新互聯(lián)

forkjoin框架怎么在java中使用?相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。

冷水灘網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián),冷水灘網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為冷水灘數(shù)千家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)營(yíng)銷網(wǎng)站建設(shè)要多少錢,請(qǐng)找那個(gè)售后服務(wù)好的冷水灘做網(wǎng)站的公司定做!

fork join框架是java 7中引入框架,這個(gè)框架的引入主要是為了提升并行計(jì)算的能力。

fork join主要有兩個(gè)步驟,第一就是fork,將一個(gè)大任務(wù)分成很多個(gè)小任務(wù),第二就是join,將第一個(gè)任務(wù)的結(jié)果join起來(lái),生成最后的結(jié)果。如果第一步中并沒(méi)有任何返回值,join將會(huì)等到所有的小任務(wù)都結(jié)束。

還記得之前的文章我們講到了thread pool的基本結(jié)構(gòu)嗎?

  • ExecutorService - ForkJoinPool 用來(lái)調(diào)用任務(wù)執(zhí)行。

  • workerThread - ForkJoinWorkerThread 工作線程,用來(lái)執(zhí)行具體的任務(wù)。

  • task - ForkJoinTask 用來(lái)定義要執(zhí)行的任務(wù)。

下面我們從這三個(gè)方面來(lái)詳細(xì)講解fork join框架。

ForkJoinPool

ForkJoinPool是一個(gè)ExecutorService的一個(gè)實(shí)現(xiàn),它提供了對(duì)工作線程和線程池的一些便利管理方法。

public class ForkJoinPool extends AbstractExecutorService

一個(gè)work thread一次只能處理一個(gè)任務(wù),但是ForkJoinPool并不會(huì)為每個(gè)任務(wù)都創(chuàng)建一個(gè)單獨(dú)的線程,它會(huì)使用一個(gè)特殊的數(shù)據(jù)結(jié)構(gòu)double-ended queue來(lái)存儲(chǔ)任務(wù)。這樣的結(jié)構(gòu)可以方便的進(jìn)行工作竊?。╳ork-stealing)。

什么是work-stealing呢?

默認(rèn)情況下,work thread從分配給自己的那個(gè)隊(duì)列頭中取出任務(wù)。如果這個(gè)隊(duì)列是空的,那么這個(gè)work thread會(huì)從其他的任務(wù)隊(duì)列尾部取出任務(wù)來(lái)執(zhí)行,或者從全局隊(duì)列中取出。這樣的設(shè)計(jì)可以充分利用work thread的性能,提升并發(fā)能力。

下面看下怎么創(chuàng)建一個(gè)ForkJoinPool。

最常見(jiàn)的方法就是使用ForkJoinPool.commonPool()來(lái)創(chuàng)建,commonPool()為所有的ForkJoinTask提供了一個(gè)公共默認(rèn)的線程池。

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

另外一種方式是使用構(gòu)造函數(shù):

ForkJoinPool forkJoinPool = new ForkJoinPool(2);

這里的參數(shù)是并行級(jí)別,2指的是線程池將會(huì)使用2個(gè)處理器核心。

ForkJoinWorkerThread

ForkJoinWorkerThread是使用在ForkJoinPool的工作線程。

public class ForkJoinWorkerThread extends Thread
}

和一般的線程不一樣的是它定義了兩個(gè)變量:

 final ForkJoinPool pool;    // the pool this thread works in
 final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

一個(gè)是該worker thread所屬的ForkJoinPool。 另外一個(gè)是支持 work-stealing機(jī)制的Queue。

再看一下它的run方法:

 public void run() {
  if (workQueue.array == null) { // only run once
   Throwable exception = null;
   try {
    onStart();
    pool.runWorker(workQueue);
   } catch (Throwable ex) {
    exception = ex;
   } finally {
    try {
     onTermination(exception);
    } catch (Throwable ex) {
     if (exception == null)
      exception = ex;
    } finally {
     pool.deregisterWorker(this, exception);
    }
   }
  }
 }

簡(jiǎn)單點(diǎn)講就是從Queue中取出任務(wù)執(zhí)行。

ForkJoinTask

ForkJoinTask是ForkJoinPool中運(yùn)行的任務(wù)類型。通常我們會(huì)用到它的兩個(gè)子類:RecursiveActionRecursiveTask<V>。

他們都定義了一個(gè)需要實(shí)現(xiàn)的compute()方法用來(lái)實(shí)現(xiàn)具體的業(yè)務(wù)邏輯。不同的是RecursiveAction只是用來(lái)執(zhí)行任務(wù),而RecursiveTask<V>可以有返回值。

既然兩個(gè)類都帶了Recursive,那么具體的實(shí)現(xiàn)邏輯也會(huì)跟遞歸有關(guān),我們舉個(gè)使用RecursiveAction來(lái)打印字符串的例子:

public class CustomRecursiveAction extends RecursiveAction {

 private String workload = "";
 private static final int THRESHOLD = 4;

 private static Logger logger =
   Logger.getAnonymousLogger();

 public CustomRecursiveAction(String workload) {
  this.workload = workload;
 }

 @Override
 protected void compute() {
  if (workload.length() > THRESHOLD) {
   ForkJoinTask.invokeAll(createSubtasks());
  } else {
   processing(workload);
  }
 }

 private List<CustomRecursiveAction> createSubtasks() {
  List<CustomRecursiveAction> subtasks = new ArrayList<>();

  String partOne = workload.substring(0, workload.length() / 2);
  String partTwo = workload.substring(workload.length() / 2, workload.length());

  subtasks.add(new CustomRecursiveAction(partOne));
  subtasks.add(new CustomRecursiveAction(partTwo));

  return subtasks;
 }

 private void processing(String work) {
  String result = work.toUpperCase();
  logger.info("This result - (" + result + ") - was processed by "
    + Thread.currentThread().getName());
 }
}

上面的例子使用了二分法來(lái)打印字符串。

我們?cè)倏匆粋€(gè)RecursiveTask<V>的例子:

public class CustomRecursiveTask extends RecursiveTask<Integer> {
 private int[] arr;

 private static final int THRESHOLD = 20;

 public CustomRecursiveTask(int[] arr) {
  this.arr = arr;
 }

 @Override
 protected Integer compute() {
  if (arr.length > THRESHOLD) {
   return ForkJoinTask.invokeAll(createSubtasks())
     .stream()
     .mapToInt(ForkJoinTask::join)
     .sum();
  } else {
   return processing(arr);
  }
 }

 private Collection<CustomRecursiveTask> createSubtasks() {
  List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
  dividedTasks.add(new CustomRecursiveTask(
    Arrays.copyOfRange(arr, 0, arr.length / 2)));
  dividedTasks.add(new CustomRecursiveTask(
    Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
  return dividedTasks;
 }

 private Integer processing(int[] arr) {
  return Arrays.stream(arr)
    .filter(a -> a > 10 && a < 27)
    .map(a -> a * 10)
    .sum();
 }
}

和上面的例子很像,不過(guò)這里我們需要有返回值。

在ForkJoinPool中提交Task

有了上面的兩個(gè)任務(wù),我們就可以在ForkJoinPool中提交了:

int[] intArray= {12,12,13,14,15};
  CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);

  int result = forkJoinPool.invoke(customRecursiveTask);
  System.out.println(result);

上面的例子中,我們使用invoke來(lái)提交,invoke將會(huì)等待任務(wù)的執(zhí)行結(jié)果。

如果不使用invoke,我們也可以將其替換成fork()和join():

customRecursiveTask.fork();
  int result2= customRecursiveTask.join();
  System.out.println(result2);

fork() 是將任務(wù)提交給pool,但是并不觸發(fā)執(zhí)行, join()將會(huì)真正的執(zhí)行并且得到返回結(jié)果。

看完上述內(nèi)容,你們掌握f(shuō)orkjoin框架怎么在java中使用的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)網(wǎng)站建設(shè)公司行業(yè)資訊頻道,感謝各位的閱讀!

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)建站muchs.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。

網(wǎng)站欄目:forkjoin框架怎么在java中使用-創(chuàng)新互聯(lián)
當(dāng)前鏈接:http://muchs.cn/article18/dssegp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App開(kāi)發(fā)、關(guān)鍵詞優(yōu)化、營(yíng)銷型網(wǎng)站建設(shè)、App設(shè)計(jì)、小程序開(kāi)發(fā)、移動(dòng)網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)