基于C++11實(shí)現(xiàn)的線程池-創(chuàng)新互聯(lián)

提示:文章寫完后,目錄可以自動(dòng)生成,如何生成可參考右邊的幫助文檔

創(chuàng)新互聯(lián)服務(wù)項(xiàng)目包括惠民網(wǎng)站建設(shè)、惠民網(wǎng)站制作、惠民網(wǎng)頁(yè)制作以及惠民網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,惠民網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到惠民省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

基于C++11 實(shí)現(xiàn)的線程池
    • 基于C++11 實(shí)現(xiàn)的線程池
      • 1、線程池原理
      • 2、線程池的設(shè)計(jì)思路?
      • 3、一個(gè)基于C++11的優(yōu)秀的線程池
        • 3.1 頭文件
        • 3.2 線程池類
        • 3.3 構(gòu)造函數(shù)實(shí)現(xiàn)
        • 3.4 入隊(duì)—添加新的工作任務(wù)到線程池
        • 3.5 析構(gòu)函數(shù)
        • 3.6 測(cè)試代碼
        • 3.7 演示


基于C++11 實(shí)現(xiàn)的線程池

??C++11加入了線程庫(kù),這是歷史性的一步跨越,因?yàn)樗讶荒軌驅(qū)崿F(xiàn)簡(jiǎn)單的并發(fā)了,但有這樣一個(gè)問題:如果并發(fā)的線程數(shù)量很多,并且每個(gè)線程都是執(zhí)行一個(gè)時(shí)間很短的任務(wù)就結(jié)束了,這樣頻繁創(chuàng)建線程就會(huì)大大降低系統(tǒng)的效率,因?yàn)轭l繁創(chuàng)建線程和銷毀線程需要時(shí)間。

??如何解決這種線程的頻繁創(chuàng)建銷毀呢?就是線程執(zhí)行完一個(gè)任務(wù),不會(huì)被銷毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)呢?

??一個(gè)好的辦法就是使用線程池,不過線程池是個(gè)什么東西呢?它又該怎么實(shí)現(xiàn)呢?

1、線程池原理

??線程池是一種多線程處理形式,處理過程中將任務(wù)添加到隊(duì)列,然后在創(chuàng)建線程后自動(dòng)啟動(dòng)這些任務(wù)。線程池線程都是后臺(tái)線程。每個(gè)線程都使用默認(rèn)的堆棧大小,以默認(rèn)的優(yōu)先級(jí)運(yùn)行,并處于多線程單元中。如果某個(gè)線程在托管代碼中空閑(如正在等待某個(gè)事件), 則線程池將插入另一個(gè)輔助線程來(lái)使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊(duì)列中包含掛起的工作,則線程池將在一段時(shí)間后創(chuàng)建另一個(gè)輔助線程但線程的數(shù)目永遠(yuǎn)不會(huì)超過大值。超過大值的線程可以排隊(duì),但他們要等到其他線程完成后才啟動(dòng)。

2、線程池的設(shè)計(jì)思路?

實(shí)現(xiàn)線程池有以下幾個(gè)步驟:

(1)設(shè)置一個(gè)生產(chǎn)者消費(fèi)者隊(duì)列,作為臨界資源。

(2)初始化n個(gè)線程,并讓其運(yùn)行起來(lái),加鎖去隊(duì)列里取任務(wù)運(yùn)行

(3)當(dāng)任務(wù)隊(duì)列為空時(shí),所有線程阻塞。

(4)當(dāng)生產(chǎn)者隊(duì)列來(lái)了一個(gè)任務(wù)后,先對(duì)隊(duì)列加鎖,把任務(wù)掛到隊(duì)列上,然后使用條件變量去通知阻塞中的一個(gè)線程來(lái)處理。

3、一個(gè)基于C++11的優(yōu)秀的線程池
  • 此線程池來(lái)源于github上一個(gè)6.1k star的線程池項(xiàng)目:
  • 源碼地址:https://github.com/progschj/ThreadPool
3.1 頭文件
#include#include#include#include#include// 鎖
#include// 條件變量
#include#include#include

??頭文件中,沒有什么特別的,這里稍微說(shuō)下:thread—線程相關(guān)的庫(kù),mutex—互斥量,也就是互斥鎖,condition_variable—條件變量,用于喚醒線程和阻塞線程,future的話,在這里就是一個(gè)用來(lái)獲取線程數(shù)據(jù)的函數(shù)。

??對(duì)于C++11的并發(fā)編程部分(std::thread,std::future等)可以參考下面幾篇文章進(jìn)行學(xué)習(xí):

  • 貨比三家:C++ 中的 task based 并發(fā):https://segmentfault.com/a/1190000002706259
  • 從 pthread 轉(zhuǎn)換到 std::thread:https://segmentfault.com/a/1190000002655852
3.2 線程池類

??線程池類中:

  • 首先是一個(gè)構(gòu)造函數(shù),參數(shù)size_t是所傳入的線程數(shù)。
  • 然后是一個(gè)enqueue的模板函數(shù),emmm…,大佬就是大佬,這個(gè)函數(shù)真是大腸包小腸,具體看一看,其需要傳入的參數(shù)是一個(gè)函數(shù)和函數(shù)參數(shù),然后返回的是一個(gè)future,最后還通過一個(gè)type進(jìn)行檢測(cè)。
  • workers是工作線程
  • tasks是任務(wù)隊(duì)列
class ThreadPool {public:
    ThreadPool(size_t);
    templateauto enqueue(F&& f, Args&&... args) 
        ->std::future::type>;
    ~ThreadPool();
private:
   	// 需要跟蹤線程,以便我們可以加入它們
    std::vector< std::thread >workers;
    // 任務(wù)隊(duì)列
    std::queue< std::function>tasks;
    // 同步
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};
3.3 構(gòu)造函數(shù)實(shí)現(xiàn)

??這個(gè)構(gòu)造函數(shù)主要是用來(lái)啟動(dòng)一些工作線程,咋一看還不一定能看的懂,下面我們?cè)敿?xì)分析:

  • 首先在初始化列表中初始化stopfalse
  • 然后是一個(gè)線程數(shù)的for循環(huán),循環(huán)里是一個(gè)很長(zhǎng)的workers.emplace_back()語(yǔ)句,就是添加線程到工作線程組。再往里面則是一個(gè)lambda表達(dá)式。
  • lambda的函數(shù)體第一層是一個(gè)無(wú)線循環(huán),用于線程內(nèi)不斷的從任務(wù)隊(duì)列取任務(wù)執(zhí)行,第二層有一個(gè)task就是一個(gè)函數(shù)對(duì)象,然后又一個(gè)大括號(hào),剛開始我沒看明白,但往下看就能懂,在這個(gè)大括號(hào)(大括號(hào)作用:控制lock臨時(shí)變量的生命周期,執(zhí)行完畢或return后自動(dòng)釋放鎖)里面有:
    • std::unique_locklock(this->queue_mutex);加鎖
    • this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });等待條件成立(當(dāng)線程池被關(guān)閉或有可消費(fèi)任務(wù)時(shí)跳過wait繼續(xù);否則condition.wait將會(huì)unlock釋放鎖,其他線程可以繼續(xù)拿鎖,但此線程會(huì)阻塞此處并休眠,直到被notify_*喚醒,被喚醒時(shí)wait會(huì)再次lock并判斷條件是否成立,如成立則跳過wait,否則unlock并休眠繼續(xù)等待下次喚醒)
    • condition只是返回了false才會(huì)wait,也就是!stop && empty才會(huì)wait
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :  stop(false)
{for(size_t i = 0;ifor(;;)
                {std::functiontask; //線程中的函數(shù)對(duì)象
                    // 通過lock互斥獲取一個(gè)隊(duì)列任務(wù)和任務(wù)的執(zhí)行函數(shù)
                    {std::unique_locklock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        // 任務(wù)隊(duì)列的隊(duì)首任務(wù)
                        task = std::move(this->tasks.front());
                        // 從隊(duì)列移除
                        this->tasks.pop();
                    }
                    // 調(diào)用函數(shù)執(zhí)行任務(wù)
                    task();
                }
            }
        );

}
3.4 入隊(duì)—添加新的工作任務(wù)到線程池

??我們重新審視std::queue< std::function>tasks;,它只能接受void的函數(shù)類型,這里使用std::packaged_task完成函數(shù)類型的推導(dǎo),因?yàn)檫@還不是最終放入tasks的對(duì)象,它要承接一個(gè)返回future的工作,而package_task就是來(lái)打包返回future的。

??其次將任務(wù)函數(shù)和其參數(shù)綁定,構(gòu)建一個(gè)packaged_task(packaged_task是對(duì)任務(wù)的一個(gè)抽象,咱們能夠給其傳遞一個(gè)函數(shù)來(lái)完成其構(gòu)造。以后將任務(wù)投遞給任何線程去完成,經(jīng)過packaged_task.get_future()方法獲取的future來(lái)獲取任務(wù)完成后的產(chǎn)出值)

// add new work item to the pool
templateauto ThreadPool::enqueue(F&& f, Args&&... args) 
    ->std::future::type>{using return_type = typename std::result_of::type;
    auto task = std::make_shared< std::packaged_task>(  //指向F函數(shù)的智能指針
            std::bind(std::forward(f), std::forward(args)...)  //傳遞函數(shù)進(jìn)行構(gòu)造
        );
    // 獲取任務(wù)的future
    std::futureres = task->get_future();
    {// 獨(dú)占拿鎖
        std::unique_locklock(queue_mutex);

        // 不允許入隊(duì)到已經(jīng)停止的線程池 
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");
        // 將任務(wù)添加到任務(wù)隊(duì)列
        tasks.emplace([task](){(*task)(); });
    }
    // 發(fā)送通知,喚醒一個(gè)wait狀態(tài)的工作線程重新?lián)屾i并重新判斷wait條件
    condition.notify_one();
    return res;
}
3.5 析構(gòu)函數(shù)
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{std::unique_locklock(queue_mutex); // 拿鎖
        stop = true; // 停止標(biāo)志
    }
    condition.notify_all(); // 條件變量進(jìn)行通知
    // 等待所有工作線程結(jié)束
    for(std::thread &worker: workers)
        worker.join();
}
3.6 測(cè)試代碼
#include#include#include#include "ThreadPool.h"

int main()
{ThreadPool pool(4);
    std::vector< std::future>results;

    for(int i = 0; i< 8; ++i) {results.emplace_back(
            pool.enqueue([i] {std::cout<< "hello "<< i<< std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
                std::cout<< "world "<< i<< std::endl;
                return i*i;
            })
        );
    }

    for(auto && result: results)
        std::cout<< result.get()<< ' ';
    std::cout<< std::endl;
    
    return 0;
}
3.7 演示
  • 演示平臺(tái):騰訊云服務(wù)器 2 核 2 G,系統(tǒng) Centos 7.6
  • 編譯環(huán)境:GCC 9.1.0
    在這里插入圖片描述

期待大家和我交流,留言或者私信,一起學(xué)習(xí),一起進(jìn)步!

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧

當(dāng)前標(biāo)題:基于C++11實(shí)現(xiàn)的線程池-創(chuàng)新互聯(lián)
瀏覽地址:http://muchs.cn/article18/ceehdp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供自適應(yīng)網(wǎng)站、手機(jī)網(wǎng)站建設(shè)動(dòng)態(tài)網(wǎng)站、服務(wù)器托管、網(wǎng)站設(shè)計(jì)公司、小程序開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

微信小程序開發(fā)