C++如何實(shí)現(xiàn)線程安全的頻率限制器

小編這次要給大家分享的是C++如何實(shí)現(xiàn)線程安全的頻率限制器,文章內(nèi)容豐富,感興趣的小伙伴可以來(lái)了解一下,希望大家閱讀完這篇文章之后能夠有所收獲。

在鎮(zhèn)坪等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供做網(wǎng)站、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作按需開(kāi)發(fā)網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),營(yíng)銷(xiāo)型網(wǎng)站,成都外貿(mào)網(wǎng)站建設(shè),鎮(zhèn)坪網(wǎng)站建設(shè)費(fèi)用合理。

很早以前,在學(xué)習(xí)使用 Python 的deque容器時(shí),我寫(xiě)了一篇文章python3 deque 雙向隊(duì)列創(chuàng)建與使用方法分析。最近需要壓測(cè)線上服務(wù)的性能,又不愿意總是在 QA 那邊排隊(duì),于是需要自己寫(xiě)一個(gè)壓測(cè)用的客戶端。其中一個(gè)核心需求就是要實(shí)現(xiàn) QPS 限制。

于是,終究逃不開(kāi)要在 C++ 中實(shí)現(xiàn)一個(gè)線程安全的頻率限制器。

設(shè)計(jì)思路

所謂頻率限制,就是要在一個(gè)時(shí)間段(inteval)中,限制操作的次數(shù)(limit)。這又可以引出兩種強(qiáng)弱不同的表述:

  • 強(qiáng)表述:在任意一個(gè)長(zhǎng)度等于設(shè)定的時(shí)間段(interval)的滑動(dòng)窗口中,頻率限制器放行的操作次數(shù)(count)都不高于限制次數(shù)(limit)。
  • 弱表述:在一組長(zhǎng)度等于設(shè)定的時(shí)間段(interval)且緊密相連的固定窗口中,每一個(gè)窗口里頻率限制器放行的操作次數(shù)(count)都不高于限制次數(shù)(limit)。

不難發(fā)現(xiàn),強(qiáng)表述通過(guò)「滑動(dòng)窗口」的方式,不僅限制了頻率,還要求了操作在時(shí)間上的均勻性。前作的頻率限制器,實(shí)際上對(duì)應(yīng)了這里的強(qiáng)表述。但實(shí)際工程中,我們通常只需要實(shí)現(xiàn)弱表述的頻率限制器就足夠使用了。

對(duì)于弱表述,實(shí)現(xiàn)起來(lái)主要思路如下:

當(dāng)操作計(jì)數(shù)(count)小于限制(limit)時(shí)直接放行;

單線程實(shí)現(xiàn)

在不考慮線程安全時(shí),不難給出這樣的實(shí)現(xiàn)。

struct ms_clock {
 using rep = std::chrono::milliseconds::rep;
 using period = std::chrono::milliseconds::period;
 using duration = std::chrono::duration<rep, period>;
 using time_point = std::chrono::time_point<ms_clock, duration>;

 static
 time_point now() noexcept {
 return time_point(std::chrono::duration_cast<duration>(
   std::chrono::steady_clock::now().time_since_epoch()));
 }
};
} // namespace __details

class RateLimiter {
 public:
 using clock = __details::ms_clock; // 1.

 private:
 const uint64_t limit_;
 const clock::duration interval_;
 const clock::rep interval_count_;

 mutable uint64_t count_{std::numeric_limits<uint64_t>::max()}; // 2.a.
 mutable clock::rep timestamp_{0};     // 2.b.

 public:
 constexpr RateLimiter(uint64_t limit, clock::duration interval) :
 limit_(limit), interval_(interval), interval_count_(interval_.count()) {}

 RateLimiter(const RateLimiter&) = delete;  // 3.a.
 RateLimiter& operator=(const RateLimiter&) = delete; // 3.b.
 RateLimiter(RateLimiter&&) = delete;   // 3.c.
 RateLimiter& operator=(RateLimiter&&) = delete; // 3.d.

 bool operator()() const;
 double qps() const {
 return 1000.0 * this->limit_ / this->interval_count_;
 }
};

bool RateLimiter::operator()() const {
 auto orig_count = this->count_++;
 if (orig_count < this->limit_) { // 4.
 return true;
 } else {
 auto ts = this->timestamp_;
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) { // 5.
 return false;
 }
 this->timestamp_ = now;
 this->count_ = 1;
 return true;
 }
}

這里,

(1) 表明頻率限制器使用單位為毫秒的時(shí)鐘。在實(shí)際使用時(shí),也可以按需改成微妙甚至納秒。

(2) 使用mutable修飾內(nèi)部狀態(tài)count_timestamp_。這是因?yàn)閮蓚€(gè)limit_interval_相同的頻率限制器,在邏輯上是等價(jià)的,但他們的內(nèi)部狀態(tài)卻不一定相同。因此,為了讓const成員函數(shù)能夠修改內(nèi)部狀態(tài)(而不改變邏輯等價(jià)),我們要給內(nèi)部狀態(tài)數(shù)據(jù)成員加上mutable修飾。

(2.a) 處將count_設(shè)置為類(lèi)型可表示的最大值是為了讓

(4) 的判斷失敗,而對(duì)timestamp_進(jìn)行初始化。

(2.b) 處將timestamp_設(shè)置為0則是基于同樣的原因,讓 (5) 的判斷失敗。

(2.a) 和 (2.b) 處通過(guò)巧妙的初值設(shè)計(jì),將初始化狀態(tài)與后續(xù)正常工作狀態(tài)的邏輯統(tǒng)一了起來(lái),而無(wú)須丑陋的判斷。

(3) 禁止了對(duì)象的拷貝和移動(dòng)。這是因?yàn)橐粋€(gè)頻率限制器應(yīng)綁定一組操作,而不應(yīng)由兩組或更多組操作共享(對(duì)于拷貝的情形),或是中途失效(對(duì)于移動(dòng)的情形)。

如此一來(lái),函數(shù)調(diào)用運(yùn)算符就忠實(shí)地實(shí)現(xiàn)了前述邏輯。

多線程改造

第一步改造

當(dāng)有多線程同時(shí)調(diào)用RateLimiter::operator()時(shí),顯而易見(jiàn),在count_timestamp_上會(huì)產(chǎn)生競(jìng)爭(zhēng)。我們有兩種辦法解決這個(gè)問(wèn)題:要不然加鎖,要不然把count_timestamp_設(shè)為原子變量然后用原子操作解決問(wèn)題。于是,對(duì)函數(shù)調(diào)用運(yùn)算符,我們有如下第一步的改造。

class RateLimiter {
 // 其余保持不變
 private:
 mutable std::atomic<uint64_t> count_{std::numeric_limits<uint64_t>::max()}; // 1.a.
 mutable std::atomic<clock::rep> timestamp_{0};     // 1.b.
 // 其余保持不變
};

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL); // 2.
 if (orig_count < this->limit_) {
 return true;
 } else {
 auto ts = this->timestamp_.load(); // 3.
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) {
 return false;
 }
 this->timestamp_.store(now); // 4.
 this->count_.store(1UL); // 5.
 return true;
 }
}

這里,

  • (1) 將count_timestamp_聲明為原子的,從而方便后續(xù)進(jìn)行原子操作。
  • (2) -- (5) 則將原有操作分別改為對(duì)應(yīng)的原子操作。

這樣看起來(lái)很完美,但其實(shí)是有 bug 的。我們重點(diǎn)關(guān)注 (4) 這里。(4) 的本意是更新timestamp_,以備下一次orig_count >= this->limit_時(shí)進(jìn)行判斷。準(zhǔn)確設(shè)置這一timestamp是頻率限制器正確工作的基石。但是,如果有兩個(gè)(或更多)線程,同時(shí)走到了 (4)會(huì)發(fā)生什么?

  • 因?yàn)樵硬僮鞯拇嬖冢瑑蓚€(gè)線程會(huì)先后執(zhí)行 (4)。于是timestamp_的值究竟是什么,我們完全不可預(yù)期。
  • 此外,兩個(gè)線程會(huì)先后執(zhí)行 (5),即原子地將count_置為1。但是你想,頻率限制器先后放行了兩次操作,但為什么count_1呢?這是不是就「偷跑」了一次操作?

為此,我們要保證只有一個(gè)線程會(huì)真正設(shè)置timestamp_,而拒絕其他同樣走到 (4) 位置的線程的操作,以避免其重復(fù)設(shè)置timestamp_以及錯(cuò)誤地將count_再次置為1。

第二步改進(jìn)

于是有以下改進(jìn)。

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL); // 3.
 if (orig_count < this->limit_) { // 4.
 return true;
 } else {
 auto ts = this->timestamp_.load();
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) { // 5.
 return false;
 }
 if (not this->timestamp_.compare_and_exchange_strong(ts, now)) { // 1.
 return false;
 }
 this->count_.store(1UL); // 2.
 return true;
 }
}

這里,(1) 是一個(gè) CAS 原子操作。它會(huì)原子地比較timestamp_ts的值(Compare):若他們相等,則將now的值寫(xiě)入timestamp_(Swap),并返回true;若他們不相等,則將timestamp_的值寫(xiě)入ts,并返回false。如果沒(méi)有其他線程搶先修改timestamp_的值,那么 CAS 操作應(yīng)該成功并返回true,繼續(xù)執(zhí)行后面的代碼;否則,說(shuō)明其他線程已經(jīng)搶先修改了timestamp_,當(dāng)前線程的操作被記入上一個(gè)周期而被頻率限制器拒絕。

現(xiàn)在要考慮 (2)。如果執(zhí)行完 (1) 之后立即立刻馬上沒(méi)有任何延遲地執(zhí)行 (2),那么當(dāng)然一切大吉。但如果這時(shí)候當(dāng)前線程被切出去,會(huì)發(fā)生什么?我們要分情況討論。

如果ts == 0,也就是「當(dāng)前線程」是頻率限制器第一次修改timestamp_。于是,當(dāng)前線程可能會(huì)在 (3) 處將count_(溢出地)自增為0,從而可能有其他線程通過(guò) (4)。此時(shí),當(dāng)前線程在當(dāng)前分片有可能應(yīng)當(dāng)被拒絕操作。為此,我們需要在 (1) 和 (2) 之間做額外的判斷。

if (ts == 0) {
 auto orig_count = this->count.fetch_add(1UL);
 return (orig_count < this->limit_);
}

如果ts != 0,也就是「當(dāng)前線程」并非頻率限制器第一次修改timestamp_。那么其他線程在 (4) 處必然判斷失敗,但在 (5) 處的判斷可能成功,從而可能繼續(xù)成功執(zhí)行 (1),從而接連兩次執(zhí)行 (2)。但這不影響正確性。因?yàn)橥ㄟ^(guò) (5) 表明相對(duì)當(dāng)前線程填入的timestamp_,已經(jīng)由過(guò)了一個(gè)時(shí)間段(interval),而在這個(gè)時(shí)間段里,只有當(dāng)前線程的一次操作會(huì)被接受。

第三次改進(jìn)

由此,我們得到:

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL);
 if (orig_count < this->limit_) {
 return true;
 } else {
 auto ts = this->timestamp_.load();
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) {
 return false;
 }
 if (not this->timestamp_.compare_and_exchange_strong(ts, now)) {
 return false;
 }
 if (ts == 0) {
 auto orig_count = this->count.fetch_add(1UL);
 return (orig_count < this->limit_);
 }
 this->count_.store(1UL);
 return true;
 }
}

至此,我們的代碼在邏輯上已經(jīng)成立了,接下來(lái)要做一些性能方面的優(yōu)化。

原子操作默認(rèn)采用std::memory_order_seq_cst的內(nèi)存模型。這是 C++ 中最嚴(yán)格的內(nèi)存模型,它有兩個(gè)保證:

  • 程序指令和源碼順序一致;
  • 所有線程的所有操作都有一致的順序。

為了實(shí)現(xiàn)順序一致性(sequential consistency),編譯器會(huì)使用很多對(duì)抗編譯器優(yōu)化和 CPU 亂序執(zhí)行(Out-of-Order Execution)的手段,因而性能較差。對(duì)于此處的count_,我們無(wú)需順序一致性模型,只需要獲取釋放模型(Aquire-Release)模型就足夠了。

第四次改進(jìn)

于是有第四次改進(jìn):

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL, std::memory_order_acq_rel);
 if (orig_count < this->limit_) {
 return true;
 } else {
 auto ts = this->timestamp_.load();
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) {
 return false;
 }
 if (not this->timestamp_.compare_and_exchange_strong(ts, now)) {
 return false;
 }
 if (ts == 0) {
 auto orig_count = this->count.fetch_add(1UL, std::memory_order_acq_rel);
 return (orig_count < this->limit_);
 }
 this->count_.store(1UL, std::memory_order_release);
 return true;
 }
}

至此,我們就完整實(shí)現(xiàn)了一個(gè)頻率限制器,可以愉快地開(kāi)始拉 QPS 壓測(cè)了!

看完這篇關(guān)于C++如何實(shí)現(xiàn)線程安全的頻率限制器的文章,如果覺(jué)得文章內(nèi)容寫(xiě)得不錯(cuò)的話,可以把它分享出去給更多人看到。

當(dāng)前名稱:C++如何實(shí)現(xiàn)線程安全的頻率限制器
分享路徑:http://muchs.cn/article44/jpiphe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站、網(wǎng)站排名網(wǎng)站設(shè)計(jì)、微信小程序、全網(wǎng)營(yíng)銷(xiāo)推廣、做網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設(shè)