java線程池代碼實現(xiàn) java線程池實現(xiàn)

Java實現(xiàn)通用線程池

線程池通俗的描述就是預(yù)先創(chuàng)建若干空閑線程 等到需要用多線程去處理事務(wù)的時候去喚醒某些空閑線程執(zhí)行處理任務(wù) 這樣就省去了頻繁創(chuàng)建線程的時間 因為頻 繁創(chuàng)建線程是要耗費大量的CPU資源的 如果一個應(yīng)用程序需要頻繁地處理大量并發(fā)事務(wù) 不斷的創(chuàng)建銷毀線程往往會大大地降低系統(tǒng)的效率 這時候線程池就派 上用場了

創(chuàng)新互聯(lián)建站長期為上千客戶提供的網(wǎng)站建設(shè)服務(wù),團隊從業(yè)經(jīng)驗10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為和靜企業(yè)提供專業(yè)的網(wǎng)站設(shè)計制作、做網(wǎng)站,和靜網(wǎng)站改版等技術(shù)服務(wù)。擁有十多年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。

本文旨在使用Java語言編寫一個通用的線程池 當(dāng)需要使用線程池處理事務(wù)時 只需按照指定規(guī)范封裝好事務(wù)處理對象 然后用已有的線程池對象去自動選擇空 閑線程自動調(diào)用事務(wù)處理對象即可 并實現(xiàn)線程池的動態(tài)修改(修改當(dāng)前線程數(shù) 最大線程數(shù)等) 下面是實現(xiàn)代碼

//ThreadTask java

package polarman threadpool;

/** *//**

*線程任務(wù)

* @author ryang

*

*/

public interface ThreadTask {

public void run();

}

//PooledThread java

package polarman threadpool;

import java util Collection; import java util Vector;

/** *//**

*接受線程池管理的線程

* @author ryang

*

*/

public class PooledThread extends Thread {

protected Vector tasks = new Vector();

protected boolean running = false;

protected boolean stopped = false;

protected boolean paused = false;

protected boolean killed = false;

private ThreadPool pool;

public PooledThread(ThreadPool pool) { this pool = pool;

}

public void putTask(ThreadTask task) { tasks add(task);

}

public void putTasks(ThreadTask[] tasks) { for(int i= ; itasks length; i++) this tasks add(tasks[i]);

}

public void putTasks(Collection tasks) { this tasks addAll(tasks);

}

protected ThreadTask popTask() { if(tasks size() ) return (ThreadTask)tasks remove( );

else

return null;

}

public boolean isRunning() {

return running;

}

public void stopTasks() {

stopped = true;

}

public void stopTasksSync() {

stopTasks();

while(isRunning()) { try {

sleep( );

} catch (InterruptedException e) {

}

}

}

public void pauseTasks() {

paused = true;

}

public void pauseTasksSync() {

pauseTasks();

while(isRunning()) { try {

sleep( );

} catch (InterruptedException e) {

}

}

}

public void kill() { if(!running)

interrupt();

else

killed = true;

}

public void killSync() {

kill();

while(isAlive()) { try {

sleep( );

} catch (InterruptedException e) {

}

}

}

public synchronized void startTasks() {

running = true;

this notify();

}

public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread(); //System out println(Thread currentThread() getId() + : 空閑 ); this wait(); }else {

ThreadTask task;

while((task = popTask()) != null) { task run(); if(stopped) {

stopped = false;

if(tasks size() ) { tasks clear(); System out println(Thread currentThread() getId() + : Tasks are stopped );

break;

}

}

if(paused) {

paused = false;

if(tasks size() ) { System out println(Thread currentThread() getId() + : Tasks are paused );

break;

}

}

}

running = false;

}

if(killed) {

killed = false;

break;

}

}

}catch(InterruptedException e) {

return;

}

//System out println(Thread currentThread() getId() + : Killed );

}

}

//ThreadPool java

package polarman threadpool;

import java util Collection; import java util Iterator; import java util Vector;

/** *//**

*線程池

* @author ryang

*

*/

public class ThreadPool {

protected int maxPoolSize;

protected int initPoolSize;

protected Vector threads = new Vector();

protected boolean initialized = false;

protected boolean hasIdleThread = false;

public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSize; this initPoolSize = initPoolSize;

}

public void init() {

initialized = true;

for(int i= ; iinitPoolSize; i++) {

PooledThread thread = new PooledThread(this);

thread start(); threads add(thread);

}

//System out println( 線程池初始化結(jié)束 線程數(shù)= + threads size() + 最大線程數(shù)= + maxPoolSize);

}

public void setMaxPoolSize(int maxPoolSize) { //System out println( 重設(shè)最大線程數(shù) 最大線程數(shù)= + maxPoolSize); this maxPoolSize = maxPoolSize;

if(maxPoolSize getPoolSize())

setPoolSize(maxPoolSize);

}

/** *//**

*重設(shè)當(dāng)前線程數(shù)

* 若需殺掉某線程 線程不會立刻殺掉 而會等到線程中的事務(wù)處理完成* 但此方法會立刻從線程池中移除該線程 不會等待事務(wù)處理結(jié)束

* @param size

*/

public void setPoolSize(int size) { if(!initialized) {

initPoolSize = size;

return;

}else if(size getPoolSize()) { for(int i=getPoolSize(); isize imaxPoolSize; i++) {

PooledThread thread = new PooledThread(this);

thread start(); threads add(thread);

}

}else if(size getPoolSize()) { while(getPoolSize() size) { PooledThread th = (PooledThread)threads remove( ); th kill();

}

}

//System out println( 重設(shè)線程數(shù) 線程數(shù)= + threads size());

}

public int getPoolSize() { return threads size();

}

protected void notifyForIdleThread() {

hasIdleThread = true;

}

protected boolean waitForIdleThread() {

hasIdleThread = false;

while(!hasIdleThread getPoolSize() = maxPoolSize) { try { Thread sleep( ); } catch (InterruptedException e) {

return false;

}

}

return true;

}

public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator(); itr hasNext();) { PooledThread th = (PooledThread)itr next(); if(!th isRunning())

return th;

}

if(getPoolSize() maxPoolSize) {

PooledThread thread = new PooledThread(this);

thread start(); threads add(thread);

return thread;

}

//System out println( 線程池已滿 等待 );

if(waitForIdleThread() == false)

return null;

}

}

public void processTask(ThreadTask task) {

PooledThread th = getIdleThread();

if(th != null) { th putTask(task); th startTasks();

}

}

public void processTasksInSingleThread(ThreadTask[] tasks) {

PooledThread th = getIdleThread();

if(th != null) { th putTasks(tasks); th startTasks();

}

}

public void processTasksInSingleThread(Collection tasks) {

PooledThread th = getIdleThread();

if(th != null) { th putTasks(tasks); th startTasks();

}

}

}

下面是線程池的測試程序

//ThreadPoolTest java

import java io BufferedReader; import java io IOException; import java io InputStreamReader;

import polarman threadpool ThreadPool; import polarman threadpool ThreadTask;

public class ThreadPoolTest {

public static void main(String[] args) { System out println( quit 退出 ); System out println( task A 啟動任務(wù)A 時長為 秒 ); System out println( size 設(shè)置當(dāng)前線程池大小為 ); System out println( max 設(shè)置線程池最大線程數(shù)為 ); System out println();

final ThreadPool pool = new ThreadPool( ); pool init();

Thread cmdThread = new Thread() { public void run() {

BufferedReader reader = new BufferedReader(new InputStreamReader(System in));

while(true) { try { String line = reader readLine(); String words[] = line split( ); if(words[ ] equalsIgnoreCase( quit )) { System exit( ); }else if(words[ ] equalsIgnoreCase( size ) words length = ) { try { int size = Integer parseInt(words[ ]); pool setPoolSize(size); }catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( max ) words length = ) { try { int max = Integer parseInt(words[ ]); pool setMaxPoolSize(max); }catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( task ) words length = ) { try { int timelen = Integer parseInt(words[ ]); SimpleTask task = new SimpleTask(words[ ] timelen * ); pool processTask(task); }catch(Exception e) {

}

}

} catch (IOException e) { e printStackTrace();

}

}

}

};

cmdThread start();

/**//*

for(int i= ; i ; i++){

SimpleTask task = new SimpleTask( Task + i (i+ )* ); pool processTask(task);

}*/

}

}

class SimpleTask implements ThreadTask {

private String taskName;

private int timeLen;

public SimpleTask(String taskName int timeLen) { this taskName = taskName; this timeLen = timeLen;

}

public void run() { System out println(Thread currentThread() getId() +

: START TASK + taskName + );

try { Thread sleep(timeLen); } catch (InterruptedException e) {

}

System out println(Thread currentThread() getId() +

: END TASK + taskName + );

}

}

使用此線程池相當(dāng)簡單 下面兩行代碼初始化線程池

ThreadPool pool = new ThreadPool( ); pool init();

要處理的任務(wù)實現(xiàn)ThreadTask 接口即可(如測試代碼里的SimpleTask) 這個接口只有一個方法run()

兩行代碼即可調(diào)用

lishixinzhi/Article/program/Java/hx/201311/27203

java線程池(一) 簡述線程池的幾種使用方式

首先說明下java線程是如何實現(xiàn)線程重用的

1. 線程執(zhí)行完一個Runnable的run()方法后,不會被殺死

2. 當(dāng)線程被重用時,這個線程會進入新Runnable對象的run()方法12

java線程池由Executors提供的幾種靜態(tài)方法創(chuàng)建線程池。下面通過代碼片段簡單介紹下線程池的幾種實現(xiàn)方式。后續(xù)會針對每個實現(xiàn)方式做詳細的說明

newFixedThreadPool

創(chuàng)建一個固定大小的線程池

添加的任務(wù)達到線程池的容量之后開始加入任務(wù)隊列開始線程重用總共開啟線程個數(shù)跟指定容量相同。

@Test

public void newFixedThreadPool() throws Exception {

ExecutorService executorService = Executors.newFixedThreadPool(1);

executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().build());

RunThread run1 = new RunThread("run 1");

executorService.execute(run1);

executorService.shutdown();

}12345678

newSingleThreadExecutor

僅支持單線程順序處理任務(wù)

@Test

public void newSingleThreadExecutor() throws Exception {

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().build());

executorService.execute(new RunThread("run 1"));

executorService.execute(new RunThread("run 2"));

executorService.shutdown();

}123456789

newCachedThreadPool

這種情況跟第一種的方式類似,不同的是這種情況線程池容量上線是Integer.MAX_VALUE 并且線程池開啟緩存60s

@Test

public void newCachedThreadPool() throws Exception {

ExecutorService executorService = Executors.newCachedThreadPool();

executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().build());

executorService.execute(new RunThread("run 1"));

executorService.execute(new RunThread("run 2"));

executorService.shutdown();

}123456789

newWorkStealingPool

支持給定的并行級別,并且可以使用多個隊列來減少爭用。

@Test

public void newWorkStealingPool() throws Exception {

ExecutorService executorService = Executors.newWorkStealingPool();

executorService = Executors.newWorkStealingPool(1);

RunThread run1 = new RunThread("run 1");

executorService.execute(run1);

executorService.shutdown();

}123456789

newScheduledThreadPool

看到的現(xiàn)象和第一種相同,也是在線程池滿之前是新建線程,然后開始進入任務(wù)隊列,進行線程重用

支持定時周期執(zhí)行任務(wù)(還沒有看完)

@Test

public void newScheduledThreadPool() throws Exception {

ExecutorService executorService = Executors.newScheduledThreadPool(1);

executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());

executorService.execute(new RunThread("run 1"));

executorService.execute(new RunThread("run 2"));

executorService.shutdown();

}

java線程池怎么實現(xiàn)

要想理解清楚java線程池實現(xiàn)原理,明白下面幾個問題就可以了:

(1):線程池存在哪些狀態(tài),這些狀態(tài)之間是如何進行切換的呢?

(2):線程池的種類有哪些?

(3):創(chuàng)建線程池需要哪些參數(shù),這些參數(shù)的具體含義是什么?

(4):將任務(wù)添加到線程池之后運行流程?

(5):線程池是怎么做到重用線程的呢?

(6):線程池的關(guān)閉

首先回答第一個問題:線程池存在哪些狀態(tài);

查看ThreadPoolExecutor源碼便知曉:

[java]?view plain?copy

//?runState?is?stored?in?the?high-order?bits

private?static?final?int?RUNNING????=?-1??COUNT_BITS;

private?static?final?int?SHUTDOWN???=??0??COUNT_BITS;

private?static?final?int?STOP???????=??1??COUNT_BITS;

private?static?final?int?TIDYING????=??2??COUNT_BITS;

private?static?final?int?TERMINATED?=??3??COUNT_BITS;

存在5種狀態(tài):

1Running:可以接受新任務(wù),同時也可以處理阻塞隊列里面的任務(wù);

2Shutdown:不可以接受新任務(wù),但是可以處理阻塞隊列里面的任務(wù);

3Stop:不可以接受新任務(wù),也不處理阻塞隊列里面的任務(wù),同時還中斷正在處理的任務(wù);

4Tidying:屬于過渡階段,在這個階段表示所有的任務(wù)已經(jīng)執(zhí)行結(jié)束了,當(dāng)前線程池中是不存在有效的線程的,并且將要調(diào)用terminated方法;

5Terminated:終止?fàn)顟B(tài),這個狀態(tài)是在調(diào)用完terminated方法之后所處的狀態(tài);

那么這5種狀態(tài)之間是如何進行轉(zhuǎn)換的呢?查看ThreadPoolExecutor源碼里面的注釋便可以知道啦:

[java]?view plain?copy

*?RUNNING?-?SHUTDOWN

*????On?invocation?of?shutdown(),?perhaps?implicitly?in?finalize()

*?(RUNNING?or?SHUTDOWN)?-?STOP

*????On?invocation?of?shutdownNow()

*?SHUTDOWN?-?TIDYING

*????When?both?queue?and?pool?are?empty

*?STOP?-?TIDYING

*????When?pool?is?empty

*?TIDYING?-?TERMINATED

*????When?the?terminated()?hook?method?has?completed

從上面可以看到,在調(diào)用shutdown方法的時候,線程池狀態(tài)會從Running轉(zhuǎn)換成Shutdown;在調(diào)用shutdownNow方法的時候,線程池狀態(tài)會從Running/Shutdown轉(zhuǎn)換成Stop;在阻塞隊列為空同時線程池為空的情況下,線程池狀態(tài)會從Shutdown轉(zhuǎn)換成Tidying;在線程池為空的情況下,線程池狀態(tài)會從Stop轉(zhuǎn)換成Tidying;當(dāng)調(diào)用terminated方法之后,線程池狀態(tài)會從Tidying轉(zhuǎn)換成Terminate;

在明白了線程池的各個狀態(tài)以及狀態(tài)之間是怎么進行切換之后,我們來看看第二個問題,線程池的種類:

(1):CachedThreadPool:緩存線程池,該類線程池中線程的數(shù)量是不確定的,理論上可以達到Integer.MAX_VALUE個,這種線程池中的線程都是非核心線程,既然是非核心線程,那么就存在超時淘汰機制了,當(dāng)里面的某個線程空閑時間超過了設(shè)定的超時時間的話,就會回收掉該線程;

(2):FixedThreadPool:固定線程池,這類線程池中是只存在核心線程的,對于核心線程來說,如果我們不設(shè)置allowCoreThreadTimeOut屬性的話是不存在超時淘汰機制的,這類線程池中的corePoolSize的大小是等于maximumPoolSize大小的,也就是說,如果線程池中的線程都處于活動狀態(tài)的話,如果有新任務(wù)到來,他是不會開辟新的工作線程來處理這些任務(wù)的,只能將這些任務(wù)放到阻塞隊列里面進行等到,直到有核心線程空閑為止;

(3):ScheduledThreadPool:任務(wù)線程池,這種線程池中核心線程的數(shù)量是固定的,而對于非核心線程的數(shù)量是不限制的,同時對于非核心線程是存在超時淘汰機制的,主要適用于執(zhí)行定時任務(wù)或者周期性任務(wù)的場景;

(4):SingleThreadPool:單一線程池,線程池里面只有一個線程,同時也不存在非核心線程,感覺像是FixedThreadPool的特殊版本,他主要用于確保任務(wù)在同一線程中的順序執(zhí)行,有點類似于進行同步吧;

接下來我們來看第三個問題,創(chuàng)建線程池需要哪些參數(shù):

同樣查看ThreadPoolExecutor源碼,查看創(chuàng)建線程池的構(gòu)造函數(shù):

[java]?view plain?copy

public?ThreadPoolExecutor(int?corePoolSize,

int?maximumPoolSize,

long?keepAliveTime,

TimeUnit?unit,

BlockingQueueRunnable?workQueue,

ThreadFactory?threadFactory,

RejectedExecutionHandler?handler)

不管你調(diào)用的是ThreadPoolExecutor的哪個構(gòu)造函數(shù),最終都會執(zhí)行到這個構(gòu)造函數(shù)的,這個構(gòu)造函數(shù)有7個參數(shù),正是由于對這7個參數(shù)值的賦值不同,造成生成不同類型的線程池,比如我們常見的CachedThreadPoolExecutor、FixedThreadPoolExecutor

SingleThreadPoolExecutor、ScheduledThreadPoolExecutor,我們老看看這幾個參數(shù)的具體含義:

1corePoolSize:線程池中核心線程的數(shù)量;當(dāng)提交一個任務(wù)到線程池的時候,線程池會創(chuàng)建一個線程來執(zhí)行執(zhí)行任務(wù),即使有其他空閑的線程存在,直到線程數(shù)達到corePoolSize時不再創(chuàng)建,這時候會把提交的新任務(wù)放入到阻塞隊列中,如果調(diào)用了線程池的preStartAllCoreThreads方法,則會在創(chuàng)建線程池的時候初始化出來核心線程;

2maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù);如果阻塞隊列已經(jīng)滿了,同時已經(jīng)創(chuàng)建的線程數(shù)小于最大線程數(shù)的話,那么會創(chuàng)建新的線程來處理阻塞隊列中的任務(wù);

3keepAliveTime:線程活動保持時間,指的是工作線程空閑之后繼續(xù)存活的時間,默認情況下,這個參數(shù)只有線程數(shù)大于corePoolSize的時候才會起作用,即當(dāng)線程池中的線程數(shù)目大于corePoolSize的時候,如果某一個線程的空閑時間達到keepAliveTime,那么這個線程是會被終止的,直到線程池中的線程數(shù)目不大于corePoolSize;如果調(diào)用allowCoreThreadTimeOut的話,在線程池中線程數(shù)量不大于corePoolSize的時候,keepAliveTime參數(shù)也可以起作用的,知道線程數(shù)目為0為止;

4unit:參數(shù)keepAliveTime的時間單位;

5workQueue:阻塞隊列;用于存儲等待執(zhí)行的任務(wù),有四種阻塞隊列類型,ArrayBlockingQueue(基于數(shù)組的有界阻塞隊列)、LinkedBlockingQueue(基于鏈表結(jié)構(gòu)的阻塞隊列)、SynchronousQueue(不存儲元素的阻塞隊列)、PriorityBlockingQueue(具有優(yōu)先級的阻塞隊列);

6threadFactory:用于創(chuàng)建線程的線程工廠;

7handler:當(dāng)阻塞隊列滿了,且沒有空閑線程的情況下,也就是說這個時候,線程池中的線程數(shù)目已經(jīng)達到了最大線程數(shù)量,處于飽和狀態(tài),那么必須采取一種策略來處理新提交的任務(wù),我們可以自己定義處理策略,也可以使用系統(tǒng)已經(jīng)提供給我們的策略,先來看看系統(tǒng)為我們提供的4種策略,AbortPolicy(直接拋出異常)、CallerRunsPolicy(只有調(diào)用者所在的線程來運行任務(wù))、DiscardOldestPolicy(丟棄阻塞隊列中最近的一個任務(wù),并執(zhí)行當(dāng)前任務(wù))、Discard(直接丟棄);

接下來就是將任務(wù)添加到線程池之后的運行流程了;

我們可以調(diào)用submit或者execute方法,兩者最大的區(qū)別在于,調(diào)用submit方法的話,我們可以傳入一個實現(xiàn)Callable接口的對象,進而能在當(dāng)前任務(wù)執(zhí)行結(jié)束之后通過Future對象獲得任務(wù)的返回值,submit內(nèi)部實際上還是執(zhí)行的execute方法;而調(diào)用execute方法的話,是不能獲得任務(wù)執(zhí)行結(jié)束之后的返回值的;此外,調(diào)用submit方法的話是可以拋出異常的,但是調(diào)用execute方法的話,異常在其內(nèi)部得到了消化,也就是說異常在其內(nèi)部得到了處理,不會向外傳遞的;

因為submit方法最終也是會執(zhí)行execute方法的,因此我們只需要了解execute方法就可以了:

在execute方法內(nèi)部會分三種情況來進行處理:

1:首先判斷當(dāng)前線程池中的線程數(shù)量是否小于corePoolSize,如果小于的話,則直接通過addWorker方法創(chuàng)建一個新的Worker對象來執(zhí)行我們當(dāng)前的任務(wù);

2:如果說當(dāng)前線程池中的線程數(shù)量大于corePoolSize的話,那么會嘗試將當(dāng)前任務(wù)添加到阻塞隊列中,然后第二次檢查線程池的狀態(tài),如果線程池不在Running狀態(tài)的話,會將剛剛添加到阻塞隊列中的任務(wù)移出,同時拒絕當(dāng)前任務(wù)請求;如果第二次檢查發(fā)現(xiàn)當(dāng)前線程池處于Running狀態(tài)的話,那么會查看當(dāng)前線程池中的工作線程數(shù)量是否為0,如果為0的話,就會通過addWorker方法創(chuàng)建一個Worker對象出來處理阻塞隊列中的任務(wù);

3:如果原先線程池就不處于Running狀態(tài)或者我們剛剛將當(dāng)前任務(wù)添加到阻塞隊列的時候出現(xiàn)錯誤的話,那么會去嘗試通過addWorker創(chuàng)建新的Worker來處理當(dāng)前任務(wù),如果添加失敗的話,則拒絕當(dāng)前任務(wù)請求;

可以看到在上面的execute方法中,我們僅僅只是檢查了當(dāng)前線程池中的線程數(shù)量有沒有超過corePoolSize的情況,那么當(dāng)前線程池中的線程數(shù)量有沒有超過maximumPoolSize是在哪里檢測的呢?實際上是在addWorker方法里面了,我們可以看下addWorker里面的一段代碼:

[java]?view plain?copy

if?(wc?=?CAPACITY?||

wc?=?(core???corePoolSize?:?maximumPoolSize))

return?false;

如果當(dāng)前線程數(shù)量超過maximumPoolSize的話,直接就會調(diào)用return方法,返回false;

其實到這里我們很明顯可以知道,一個線程池中線程的數(shù)量實際上就是這個線程池中Worker的數(shù)量,如果Worker的大小超過了corePoolSize,那么任務(wù)都在阻塞隊列里面了,Worker是Java對我們?nèi)蝿?wù)的一個封裝類,他的聲明是醬紫的:

[java]?view plain?copy

private?final?class?Worker

extends?AbstractQueuedSynchronizer

implements?Runnable

可以看到他實現(xiàn)了Runnable接口,他是在addWorker方法里面通過new Worker(firstTask)創(chuàng)建的,我們來看看他的構(gòu)造函數(shù)就知道了:

[java]?view plain?copy

Worker(Runnable?firstTask)?{

setState(-1);?//?inhibit?interrupts?until?runWorker

this.firstTask?=?firstTask;

this.thread?=?getThreadFactory().newThread(this);

}

而這里的firstTask其實就是我們調(diào)用execute或者submit的時候傳入的那個參數(shù)罷了,一般來說這些參數(shù)是實現(xiàn)Callable或者Runnable接口的;

在通過addWorker方法創(chuàng)建出來Worker對象之后,這個方法的最后會執(zhí)行Worker內(nèi)部thread屬性的start方法,而這個thread屬性實際上就是封裝了Worker的Thread,執(zhí)行他的start方法實際上執(zhí)行的是Worker的run方法,因為Worker是實現(xiàn)了Runnable接口的,在run方法里面就會執(zhí)行runWorker方法,而runWorker方法里面首先會判斷當(dāng)前我們傳入的任務(wù)是否為空,不為空的話直接就會執(zhí)行我們通過execute或者submit方法提交的任務(wù)啦,注意一點就是我們雖然會通過submit方法提交實現(xiàn)了Callable接口的對象,但是在調(diào)用submit方法的時候,其實是會將Callable對象封裝成實現(xiàn)了Runnable接口對象的,不信我們看看submit方法源碼是怎么實現(xiàn)的:

[java]?view plain?copy

public?T?FutureT?submit(CallableT?task)?{

if?(task?==?null)?throw?new?NullPointerException();

RunnableFutureT?ftask?=?newTaskFor(task);

execute(ftask);

return?ftask;

}

看到?jīng)]有呢,實際上在你傳入實現(xiàn)了Callable接口對象的時候,在submit方法里面是會將其封裝成RunnableFuture對象的,而RunnableFuture接口是繼承了Runnable接口的;那么說白了其實就是直接執(zhí)行我們提交任務(wù)的run方法了;如果為空的話,則會通過getTask方法從阻塞隊列里面拿出一個任務(wù)去執(zhí)行;在任務(wù)執(zhí)行結(jié)束之后繼續(xù)從阻塞隊列里面拿任務(wù),直到getTask的返回值為空則退出runWorker內(nèi)部循環(huán),那么什么情況下getTask返回為空呢?查看getTask方法的源碼注釋可以知道:在Worker必須需要退出的情況下getTask會返回空,具體什么情況下Worker會退出呢?(1):當(dāng)Worker的數(shù)量超過maximumPoolSize的時候;(2):當(dāng)線程池狀態(tài)為Stop的時候;(3):當(dāng)線程池狀態(tài)為Shutdown并且阻塞隊列為空的時候;(4):使用等待超時時間從阻塞隊列中拿數(shù)據(jù),但是超時之后仍然沒有拿到數(shù)據(jù);

如果runWorker方法退出了它里面的循環(huán),那么就說明當(dāng)前阻塞隊列里面是沒有任務(wù)可以執(zhí)行的了,你可以看到在runWorker方法內(nèi)部的finally語句塊中執(zhí)行了processWorkerExit方法,用來對Worker對象進行回收操作,這個方法會傳入一個參數(shù)表示需要刪除的Worker對象;在進行Worker回收的時候會調(diào)用tryTerminate方法來嘗試關(guān)閉線程池,在tryTerminate方法里面會檢查是否有Worker在工作,檢查線程池的狀態(tài),沒問題的話就會將當(dāng)前線程池的狀態(tài)過渡到Tidying,之后調(diào)用terminated方法,將線程池狀態(tài)更新到Terminated;

從上面的分析中,我們可以看出線程池運行的4個階段:

(1):poolSize corePoolSize,則直接創(chuàng)建新的線程(核心線程)來執(zhí)行當(dāng)前提交的任務(wù);

(2):poolSize = corePoolSize,并且此時阻塞隊列沒有滿,那么會將當(dāng)前任務(wù)添加到阻塞隊列中,如果此時存在工作線程(非核心線程)的話,那么會由工作線程來處理該阻塞隊列中的任務(wù),如果此時工作線程數(shù)量為0的話,那么會創(chuàng)建一個工作線程(非核心線程)出來;

(3):poolSize = corePoolSize,并且此時阻塞隊列已經(jīng)滿了,那么會直接創(chuàng)建新的工作線程(非核心線程)來處理阻塞隊列中的任務(wù);

(4):poolSize = maximumPoolSize,并且此時阻塞隊列也滿了的話,那么會觸發(fā)拒絕機制,具體決絕策略采用的是什么就要看我們創(chuàng)建ThreadPoolExecutor的時候傳入的RejectExecutionHandler參數(shù)了;

接下來就是線程池是怎么做到重用線程的呢?

個人認為線程池里面重用線程的工作是在getTask里面實現(xiàn)的,在getTask里面是存在兩個for死循環(huán)嵌套的,他會不斷的從阻塞對列里面取出需要執(zhí)行的任務(wù),返回給我們的runWorker方法里面,而在runWorker方法里面只要getTask返回的任務(wù)不是空就會執(zhí)行該任務(wù)的run方法來處理它,這樣一直執(zhí)行下去,直到getTask返回空為止,此時的情況就是阻塞隊列里面沒有任務(wù)了,這樣一個線程處理完一個任務(wù)之后接著再處理阻塞隊列中的另一個任務(wù),當(dāng)然在線程池中的不同線程是可以并發(fā)處理阻塞隊列中的任務(wù)的,最后在阻塞隊列內(nèi)部不存在任務(wù)的時候會去判斷是否需要回收Worker對象,其實Worker對象的個數(shù)就是線程池中線程的個數(shù),至于什么情況才需要回收,上面已經(jīng)說了,就是四種情況了;

最后就是線程池是怎樣被關(guān)閉的呢?

涉及到線程池的關(guān)閉,需要用到兩個方法,shutdown和shutdownNow,他們都是位于ThreadPoolExecutor里面的,對于shutdown的話,他會將線程池狀態(tài)切換成Shutdown,此時是不會影響對阻塞隊列中任務(wù)執(zhí)行的,但是會拒絕執(zhí)行新加進來的任務(wù),同時會回收閑置的Worker;而shutdownNow方法會將線程池狀態(tài)切換成Stop,此時既不會再去處理阻塞隊列里面的任務(wù),也不會去處理新加進來的任務(wù),同時會回收所有Worker;

幾種開源Java Web容器線程池的實現(xiàn)方法簡介

其中Resin從V3.0后需要購買才能用于商業(yè)目的,而其他兩種則是純開源的。可以分別從他們的網(wǎng)站上下載最新的二進制包和源代碼。

作為Web容器,需要承受較高的訪問量,能夠同時響應(yīng)不同用戶的請求,能夠在惡劣環(huán)境下保持較高的穩(wěn)定性和健壯性。在HTTP服務(wù)器領(lǐng)域,ApacheHTTPD的效率是最高的,也是最為穩(wěn)定的,但它只能處理靜態(tài)頁面的請求,如果需要支持動態(tài)頁面請求,則必須安裝相應(yīng)的插件,比如mod_perl可以處理Perl腳本,mod_python可以處理Python腳本。

上面介紹的三中Web容器,都是使用Java編寫的HTTP服務(wù)器,當(dāng)然他們都可以嵌到Apache中使用,也可以獨立使用。分析它們處理客戶請求的方法有助于了解Java多線程和線程池的實現(xiàn)方法,為設(shè)計強大的多線程服務(wù)器打好基礎(chǔ)。

Tomcat是使用最廣的Java Web容器,功能強大,可擴展性強。最新版本的Tomcat(5.5.17)為了提高響應(yīng)速度和效率,使用了Apache Portable Runtime(APR)作為最底層,使用了APR中包含Socket、緩沖池等多種技術(shù),性能也提高了。APR也是Apache HTTPD的最底層。可想而知,同屬于ASF(Apache Software Foundation)中的成員,互補互用的情況還是很多的,雖然使用了不同的開發(fā)語言。

Tomcat 的線程池位于tomcat-util.jar文件中,包含了兩種線程池方案。方案一:使用APR的Pool技術(shù),使用了JNI;方案二:使用Java實現(xiàn)的ThreadPool。這里介紹的是第二種。如果想了解APR的Pool技術(shù),可以查看APR的源代碼。

ThreadPool默認創(chuàng)建了5個線程,保存在一個200維的線程數(shù)組中,創(chuàng)建時就啟動了這些線程,當(dāng)然在沒有請求時,它們都處理等待狀態(tài)(其實就是一個while循環(huán),不停的等待notify)。如果有請求時,空閑線程會被喚醒執(zhí)行用戶的請求。

具體的請求過程是:服務(wù)啟動時,創(chuàng)建一個一維線程數(shù)組(maxThread=200個),并創(chuàng)建空閑線程(minSpareThreads=5個)隨時等待用戶請求。當(dāng)有用戶請求時,調(diào)用 threadpool.runIt(ThreadPoolRunnable)方法,將一個需要執(zhí)行的實例傳給ThreadPool中。其中用戶需要執(zhí)行的實例必須實現(xiàn)ThreadPoolRunnable接口。 ThreadPool首先查找空閑的線程,如果有則用它運行要執(zhí)行ThreadPoolRunnable;如果沒有空閑線程并且沒有超過 maxThreads,就一次性創(chuàng)建minSpareThreads個空閑線程;如果已經(jīng)超過了maxThreads了,就等待空閑線程了??傊?,要找到空閑的線程,以便用它執(zhí)行實例。找到后,將該線程從線程數(shù)組中移走。接著喚醒已經(jīng)找到的空閑線程,用它運行執(zhí)行實例(ThreadPoolRunnable)。運行完ThreadPoolRunnable后,就將該線程重新放到線程數(shù)組中,作為空閑線程供后續(xù)使用。

由此可以看出,Tomcat的線程池實現(xiàn)是比較簡單的,ThreadPool.java也只有840行代碼。用一個一維數(shù)組保存空閑的線程,每次以一個較小步伐(5個)創(chuàng)建空閑線程并放到線程池中。使用時從數(shù)組中移走空閑的線程,用完后,再歸還給線程池。

典型Java線程池的代碼及其各部分功能介紹

( )根據(jù)xml文件來管理線程池的最大最小線程數(shù)( )對線程池通過Timer定期掃描以防止線程未激活 ( )通過某一個變量(本程序中是freeThreadCount)來得到空閑線程的數(shù)目 一 配置xml(listen xml)是 ?xml version= encoding= UTF ?configConsumeThreadPoolminPools /minPools ! 線程池最小線程 maxPools /maxPools! 線程池最大線程 checkThreadPeriod /checkThreadPeriod ! 檢查線程池中線程的周期 分鐘 /ConsumeThreadPool/config 二 對于ConsumeThreadPoolPara的javabean: import java io *;public class ConsumeThreadPoolPara implements Serializable{private int minPools;private int maxPools;private int checkThreadPeriod;public int getMinPools(){return minPools;}public int getMaxPools(){return maxPools;}public int getCheckThreadPeriod(){return checkThreadPeriod;}public void setMinPools(int minPools){this minPools = minPools;}public void setMaxPools(int maxPools){this maxPools = maxPools;}public void setCheckThreadPeriod(int checkThreadPeriod){this checkThreadPeriod = checkThreadPeriod;}public String toString(){return minPools+ + maxPools+ +checkThreadPeriod;}public ConsumeThreadPoolPara() {}public static void main(String[] args) {ConsumeThreadPoolPara consumeThreadPool = new ConsumeThreadPoolPara();}} 三 解析xml程序代碼(生成ConsumeThreadPoolPara) 使用jdom解析 import jdom *;import jdom input SAXBuilder;import java io *;import java util *;public class ParseConfig {static Hashtable Listens = null;static ConnPara connpara = null;static ConsumeThreadPoolPara consumeThreadPoolPara = null;private static String configxml = listen xml ;static{getConsumeThreadPoolPara(); //得到消費的線程池的參數(shù)}/*** 裝載文檔* @return 返回根結(jié)點* @throws JDOMException*/public static Element loadDocument() throws JDOMException{SAXBuilder parser = new SAXBuilder(); // 新建立構(gòu)造器try {Document document = parser build(configxml);Element root = document getRootElement();return root;}catch(JDOMException e){logger error( listen xml文件格式非法! );throw new JDOMException();}}public static ConsumeThreadPoolPara getConsumeThreadPoolPara(){if(consumeThreadPoolPara ==null){try {Element root = loadDocument();Element consumeThreadPool = root getChild( ConsumeThreadPool );if (consumeThreadPool != null) { //代表有數(shù)據(jù)庫配置consumeThreadPoolPara = new ConsumeThreadPoolPara();Element minPools = consumeThreadPool getChild( minPools );consumeThreadPoolPara setMinPools(Integer parseInt(minPools getTextTrim()));Element maxPools = consumeThreadPool getChild( maxPools );consumeThreadPoolPara setMaxPools(Integer parseInt(maxPools getTextTrim()));Element checkThreadPeriod = consumeThreadPool getChild( checkThreadPeriod );consumeThreadPoolPara setCheckThreadPeriod(Integer parseInt(checkThreadPeriod getTextTrim()));}}catch (JDOMException e) {}}return consumeThreadPoolPara;}} 四 線程池源代碼 import java util *;/*** pTitle: 線程池/p* pDescription: 采集消費模塊/p* pCopyright: Copyright (c) /p* pCompany: /p* @author 張榮斌* @version */public class ThreadPool {private static int minPools = ; //最小連接池數(shù)目private static int maxPools = ; //最大連接池數(shù)目private static int checkThreadPeriod = ; //檢查連接池的周期ArrayList m_ThreadList; //工作線程列表LinkedList m_RunList = null; //工作任務(wù)列表int totalThread = ; //總線程數(shù)static int freeThreadCount = ; //未被使用的線程數(shù)目private java util Timer timer = null; //定時器static Object o = new Object();static{ //先初始化線程池的參數(shù)ConsumeThreadPoolPara consumeThreadPoolPara = ParseConfig getConsumeThreadPoolPara();if(consumeThreadPoolPara!=null){minPools = consumeThreadPoolPara getMinPools();maxPools = consumeThreadPoolPara getMaxPools();checkThreadPeriod = consumeThreadPoolPara getCheckThreadPeriod()* * ;}}public void setMinPools(int minPools){this minPools = minPools;}public void setMaxPools(int maxPools){this maxPools = maxPools;}public void setCheckThreadPeriod(int checkThreadPeriod){this checkThreadPeriod = checkThreadPeriod;}public ThreadPool() {m_ThreadList=new ArrayList();m_RunList=new LinkedList();for(int i= ;iminPools;i++){WorkerThread temp=new WorkerThread();totalThread = totalThread + ;m_ThreadList add(temp);temp start();try{Thread sleep( );}catch(Exception e){}}timer = new Timer(true); //啟動定時器timer schedule(new CheckThreadTask(this) checkThreadPeriod);}/*** 當(dāng)有一個工作來的時候啟動線程池的線程* 當(dāng)空閑線程數(shù)為 的時候 看總線程是否小于最大線程池的數(shù)目 就new一個新的線程 否則sleep 直到有空閑線程為止;* 當(dāng)空閑線程不為 則將任務(wù)丟給空閑線程去完成* @param work*/public synchronized void run(String work){if (freeThreadCount == ) {if(totalThreadmaxPools){WorkerThread temp = new WorkerThread();totalThread = totalThread + ;m_ThreadList add(temp);temp start();synchronized(m_RunList){m_RunList add(work);m_RunList notify();}}else{while (freeThreadCount == ) {try {Thread sleep( );}catch (InterruptedException e) {}}synchronized(m_RunList){m_RunList add(work);m_RunList notify();}}} else {synchronized(m_RunList){m_RunList add(work);m_RunList notify();}}}/*** 檢查所有的線程的有效性*/public synchronized void checkAllThreads() {Iterator lThreadIterator = erator();while (lThreadIterator hasNext()) { //逐個遍厲WorkerThread lTestThread = (WorkerThread) lThreadIterator next();if (! (lTestThread isAlive())) { //如果處在非活動狀態(tài)時lTestThread = new WorkerThread(); //重新生成個線程lTestThread start(); //啟動}}}/*** 打印調(diào)試信息*/public void printDebugInfo(){System out println( totalThread= +totalThread);System out println( m_ThreadList size()= +m_ThreadList size());}/**** pTitle: 工作線程類/p* @author 張榮斌* @version */class WorkerThread extends Thread{boolean running = true;String work;public void run(){while(running){synchronized(o){freeThreadCount++;}synchronized(m_RunList){while(m_RunList size() == ){try{m_RunList wait();if(!running) return;}catch(InterruptedException e){} lishixinzhi/Article/program/Java/gj/201311/27379

當(dāng)前文章:java線程池代碼實現(xiàn) java線程池實現(xiàn)
鏈接分享:http://muchs.cn/article40/doscjho.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供電子商務(wù)品牌網(wǎng)站建設(shè)、定制網(wǎng)站、動態(tài)網(wǎng)站、Google定制開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司