linux下c++線程池的簡單實現(xiàn)

小編給大家分享一下linux 下c++線程池的簡單實現(xiàn),相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

在關(guān)嶺等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都做網(wǎng)站、成都網(wǎng)站制作 網(wǎng)站設(shè)計制作按需定制,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),高端網(wǎng)站設(shè)計,全網(wǎng)整合營銷推廣,成都外貿(mào)網(wǎng)站制作,關(guān)嶺網(wǎng)站建設(shè)費用合理。

作為一個c++菜鳥,研究半天這個代碼的實現(xiàn)原理,發(fā)現(xiàn)好多語法不太熟悉,因此加了一大堆注釋,僅供參考。該段代碼主要通過繼承workthread類來實現(xiàn)自己的線程代碼,通過thread_pool類來管理線程池,線程池不能夠?qū)崿F(xiàn)動態(tài)改變線程數(shù)目,存在一定局限性。目前可能還有缺陷,畢竟c++來封裝這個東西,資源釋放什么的必須想清楚,比如vector存儲了基類指針實現(xiàn)多態(tài),那么如何釋放對象仍需要考慮,后續(xù)我可能會更進(jìn)一步修改完善該代碼,下面貢獻(xiàn)一下自己的勞動成果。

#include <pthread.h>
#include <semaphore.h>
#include <iostream>
#include <vector>

using namespace std;
/*
WorkerThread class
This class needs to be sobclassed by the user.
*/
class WorkerThread{
public:
    int id;
    unsigned virtual executeThis()
	{
		return 0;
	}

    WorkerThread(int id) : id(id) {}
    virtual ~WorkerThread(){}
};

/*
ThreadPool class manages all the ThreadPool related activities. This includes keeping track of idle threads and synchronizations between all threads.
*/
class ThreadPool{
public:
    ThreadPool();
    ThreadPool(int maxThreadsTemp);
    virtual ~ThreadPool();
	
	void destroyPool(int maxPollSecs);

    bool assignWork(WorkerThread *worker);
    bool fetchWork(WorkerThread **worker);

	void initializeThreads();
	
    static void *threadExecute(void *param); // pthread_create()調(diào)用的函數(shù)必須為靜態(tài)的 
    static pthread_mutex_t mutexSync;
    static pthread_mutex_t mutexWorkCompletion;//工作完成個數(shù)互斥量
	
    
private:
    int maxThreads;
    
    pthread_cond_t  condCrit;
    sem_t availableWork;
    sem_t availableThreads;

    vector<WorkerThread *> workerQueue;

    int topIndex;
    int bottomIndex;
	int incompleteWork;
    int queueSize;
};
#include <stdlib.h>
#include "threadpool.h"
using namespace std;


//初始化類的靜態(tài)成員必須加上類型和作用域,static數(shù)據(jù)成員必須在類定義體的外部定義,不像不同數(shù)據(jù)成員可以用構(gòu)造函數(shù)初始化
//應(yīng)該在定義時進(jìn)行初始化,注意是定義,這個定義應(yīng)該放在包含類的非內(nèi)聯(lián)成員函數(shù)定義的文件中。
//注:靜態(tài)成員函數(shù)只能使用靜態(tài)變量,非靜態(tài)沒有限制,靜態(tài)變量必須在外部定義和初始化,沒初始化就為默認(rèn)數(shù)值
pthread_mutex_t ThreadPool::mutexSync = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t ThreadPool::mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;


ThreadPool::ThreadPool()
{
	ThreadPool(2);
}

ThreadPool::ThreadPool(int maxThreads)
{
   if (maxThreads < 1)  
       maxThreads=1;
   
   pthread_mutex_lock(&mutexSync);
   this->maxThreads = maxThreads;
   this->queueSize = maxThreads;
   workerQueue.resize(maxThreads, NULL);//調(diào)整容器大小,然后用默認(rèn)構(gòu)造函數(shù)初始化新的空間
   topIndex = 0;
   bottomIndex = 0;
   incompleteWork = 0;
   sem_init(&availableWork, 0, 0); //工作隊列信號量,表示已經(jīng)加入隊列的工作,初始時沒有工作
   sem_init(&availableThreads, 0, queueSize);  //空閑線程信號量,一開始就有quisize個線程可以使用
   pthread_mutex_unlock(&mutexSync);
}

//調(diào)用pthread_create()讓線程跑起來,threadExecute是類的靜態(tài)函數(shù),因為pthread_create()第三個參數(shù)必須為靜態(tài)函數(shù)
void ThreadPool::initializeThreads()
{
   for(int i = 0; i<maxThreads; ++i)
	{
		pthread_t tempThread;
		pthread_create(&tempThread, NULL, ThreadPool::threadExecute, (void*)this );
   }
}

ThreadPool::~ThreadPool()
{
	//因為對于vector,clear并不真正釋放內(nèi)存(這是為優(yōu)化效率所做的事),clear實際所做的是為vector中所保存的所有對象調(diào)用析構(gòu)函數(shù)(如果有的話),
	//然后初始化size這些東西,讓你覺得把所有的對象清除了。。。
    //真正釋放內(nèi)存是在vector的析構(gòu)函數(shù)里進(jìn)行的,所以一旦超出vector的作用域(如函數(shù)返回),首先它所保存的所有對象會被析構(gòu),
	//然后會調(diào)用allocator中的deallocate函數(shù)回收對象本身的內(nèi)存。。。
    workerQueue.clear();
}


void ThreadPool::destroyPool(int maxPollSecs = 2)
{
	while(incompleteWork>0 )
	{
	    //cout << "Work is still incomplete=" << incompleteWork << endl;
		sleep(maxPollSecs);
	}
	cout << "All Done!! Wow! That was a lot of work!" << endl;
	sem_destroy(&availableWork);
	sem_destroy(&availableThreads);
    pthread_mutex_destroy(&mutexSync);
    pthread_mutex_destroy(&mutexWorkCompletion);

}

//分配人物到top,然后通知有任務(wù)需要執(zhí)行。
bool ThreadPool::assignWork(WorkerThread *workerThread)
{
    pthread_mutex_lock(&mutexWorkCompletion);
    incompleteWork++;
		//cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
	pthread_mutex_unlock(&mutexWorkCompletion);
	sem_wait(&availableThreads);
	pthread_mutex_lock(&mutexSync);
    //workerVec[topIndex] = workerThread;
    workerQueue[topIndex] = workerThread;
    //cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl;
	if(queueSize !=1 )
		topIndex = (topIndex+1) % (queueSize-1);
    sem_post(&availableWork);
	pthread_mutex_unlock(&mutexSync);
	return true;
}

//當(dāng)已經(jīng)有人物放到隊列里面后,就會受到通知,然后從底部拿走工作,在workerArg中返回
bool ThreadPool::fetchWork(WorkerThread **workerArg)
{
	sem_wait(&availableWork);

	pthread_mutex_lock(&mutexSync);
	WorkerThread * workerThread = workerQueue[bottomIndex];
    workerQueue[bottomIndex] = NULL;
	*workerArg = workerThread;
	if(queueSize !=1 )
		bottomIndex = (bottomIndex+1) % (queueSize-1);
	sem_post(&availableThreads);
	pthread_mutex_unlock(&mutexSync);
    return true;
}

//每個線程運行的靜態(tài)函數(shù)實體,executeThis 方法將會被繼承累從寫,之后實現(xiàn)具體線程的工作。
void *ThreadPool::threadExecute(void *param)
{
	WorkerThread *worker = NULL;
	while(((ThreadPool *)param)->fetchWork(&worker))
	{
		if(worker)
        {
			worker->executeThis();
            //cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
            delete worker;
            worker = NULL;
        }

		pthread_mutex_lock( &(((ThreadPool *)param)->mutexWorkCompletion) );
        //cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
	 	((ThreadPool *)param)->incompleteWork--;
		pthread_mutex_unlock( &(((ThreadPool *)param)->mutexWorkCompletion) );
	}
	return 0;
}
#include <iostream>
#include "threadpool.h"

using namespace std;


#define ITERATIONS 20

class SampleWorkerThread : public WorkerThread
{
public:
    int id;
	unsigned virtual executeThis()
	{
	// Instead of sleep() we could do anytime consuming work here.
	// Using ThreadPools is advantageous only when the work to be done is really time consuming. (atleast 1 or 2 seconds)
		cout<<"This is SampleWorkerThread sleep 2s"<<endl;
		sleep(2);
		return(0);
	}

    SampleWorkerThread(int id) : WorkerThread(id), id(id)
    {
//       cout << "Creating SampleWorkerThread " << id << "\t address=" << this << endl;
    }

    ~SampleWorkerThread()
    {
//       cout << "Deleting SampleWorkerThread " << id << "\t address=" << this << endl;
    }
};


int main(int argc, char **argv)
{
	
	cout<<"Thread pool"<<endl;
	ThreadPool* myPool = new ThreadPool(25);
	//pthread_create()執(zhí)行,開始等待任務(wù)分配
	myPool->initializeThreads();

	//用來計算時間間隔。
    time_t t1=time(NULL);

	//分配具體工作到線程池
	for(unsigned int i=0;i<ITERATIONS;i++){
		SampleWorkerThread* myThreathreadExecuted = new SampleWorkerThread(i);
		myPool->assignWork(myThreathreadExecuted);
	}
	
	//銷毀錢等待所有線程結(jié)束,等待間隔為2秒。
    myPool->destroyPool(2);

    time_t t2=time(NULL);
    cout << t2-t1 << " seconds elapsed\n" << endl;
	delete myPool;
	
    return 0;
}

ubuntu 12.04下運行成功,編譯命令如下:g++ -g main.cpp thread_pool.cpp -o thread_pool -lpthread

以上是“l(fā)inux 下c++線程池的簡單實現(xiàn)”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

本文名稱:linux下c++線程池的簡單實現(xiàn)
網(wǎng)站網(wǎng)址:http://muchs.cn/article30/iepiso.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供響應(yīng)式網(wǎng)站軟件開發(fā)、微信小程序、品牌網(wǎng)站建設(shè)、網(wǎng)站營銷手機網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作