個(gè)推基于ApachePulsar的優(yōu)先級(jí)隊(duì)列方案

作者:個(gè)推平臺(tái)研發(fā)工程師 祥子

成都創(chuàng)新互聯(lián)10多年成都定制網(wǎng)站服務(wù);為您提供網(wǎng)站建設(shè),網(wǎng)站制作,網(wǎng)頁(yè)設(shè)計(jì)及高端網(wǎng)站定制服務(wù),成都定制網(wǎng)站及推廣,對(duì)成都石雕等多個(gè)方面擁有豐富的網(wǎng)站推廣經(jīng)驗(yàn)的網(wǎng)站建設(shè)公司。

一、業(yè)務(wù)背景

在個(gè)推的推送場(chǎng)景中,消息隊(duì)列在整個(gè)系統(tǒng)中占有非常重要的位置。
個(gè)推基于 Apache Pulsar 的優(yōu)先級(jí)隊(duì)列方案
當(dāng) APP 有推送需求的時(shí)候, 會(huì)向個(gè)推發(fā)送一條推送命令,接到推送需求后,我們會(huì)把APP要求推送消息的用戶放入下發(fā)隊(duì)列中,進(jìn)行消息下發(fā);當(dāng)同時(shí)有多個(gè)APP進(jìn)行消息下發(fā)時(shí),難免會(huì)出現(xiàn)資源競(jìng)爭(zhēng)的情況, 因此就產(chǎn)生了優(yōu)先級(jí)隊(duì)列的需求,在下發(fā)資源固定的情況下, 高優(yōu)先級(jí)的用戶需要有更多的下發(fā)資源。

二、基于 Kafka 的優(yōu)先級(jí)隊(duì)列方案

針對(duì)以上場(chǎng)景,個(gè)推基于 Kafka 設(shè)計(jì)了第一版的優(yōu)先級(jí)隊(duì)列方案。Kafka 是 LinkedIn 開(kāi)發(fā)的一個(gè)高性能、分布式消息系統(tǒng);Kafka 在個(gè)推有非常廣泛的應(yīng)用,如日志收集、在線和離線消息分發(fā)等。

架構(gòu)
個(gè)推基于 Apache Pulsar 的優(yōu)先級(jí)隊(duì)列方案
在該方案中,個(gè)推將優(yōu)先級(jí)統(tǒng)一設(shè)定為高、中、低三個(gè)級(jí)別。具體操作方案如下:

  1. 對(duì)某個(gè)優(yōu)先級(jí)根據(jù) task (單次推送任務(wù))維度,存入不同的 Topic,一個(gè) task 只寫入一個(gè) Topic,一個(gè) Topic 可存多個(gè) task;

  2. 消費(fèi)模塊根據(jù)優(yōu)先級(jí)配額(如 6:3:1),獲取不同優(yōu)先級(jí)的消息數(shù),同一優(yōu)先級(jí)輪詢獲取消息;這樣既保證了高優(yōu)先級(jí)用戶可以更快地發(fā)送消息,又避免了低優(yōu)先級(jí)用戶出現(xiàn)沒(méi)有下發(fā)的情況。

Kafka 方案遇到的問(wèn)題

隨著個(gè)推業(yè)務(wù)的不斷發(fā)展,接入的 APP 數(shù)量逐漸增多,第一版的優(yōu)先級(jí)方案也逐漸暴露出一些問(wèn)題:

  1. 當(dāng)相同優(yōu)先級(jí)的 APP 在同一時(shí)刻推送任務(wù)越來(lái)越多時(shí),后面進(jìn)入的 task 消息會(huì)因?yàn)榍懊?task 消息還存在隊(duì)列情況而出現(xiàn)延遲。如下圖所示, 當(dāng) task1 消息量過(guò)大時(shí),在task1 消費(fèi)結(jié)束前,taskN 將一直處于等待狀態(tài)。
    個(gè)推基于 Apache Pulsar 的優(yōu)先級(jí)隊(duì)列方案
  2. Kafka 在 Topic 數(shù)量由 64 增長(zhǎng)到 256 時(shí),吞吐量下降嚴(yán)重,Kafka 的每個(gè) Topic、每個(gè)分區(qū)都會(huì)對(duì)應(yīng)一個(gè)物理文件。當(dāng) Topic 數(shù)量增加時(shí),消息分散的落盤策略會(huì)導(dǎo)致磁盤 IO 競(jìng)爭(zhēng)激烈,因此我們不能僅通過(guò)增加 Topic 數(shù)量來(lái)緩解第一點(diǎn)中的問(wèn)題。

基于上述問(wèn)題,個(gè)推進(jìn)行了新一輪的技術(shù)選型, 我們需要可以創(chuàng)建大量的 Topic, 同時(shí)吞吐性能不能比 Kafka 遜色。經(jīng)過(guò)一段時(shí)間的調(diào)研,Apache Pulsar 引起了我們的關(guān)注。

三、為什么是 Pulsar

Apache Pulsar 是一個(gè)企業(yè)級(jí)的分布式消息系統(tǒng),最初由 Yahoo 開(kāi)發(fā),在 2016 年開(kāi)源,并于2018年9月畢業(yè)成為 Apache 基金會(huì)的頂級(jí)項(xiàng)目。Pulsar 已經(jīng)在 Yahoo 的生產(chǎn)環(huán)境使用了三年多,主要服務(wù)于Mail、Finance、Sports、 Flickr、 the Gemini Ads platform、 Sherpa (Yahoo 的 KV 存儲(chǔ))。

架構(gòu)
個(gè)推基于 Apache Pulsar 的優(yōu)先級(jí)隊(duì)列方案

Topic 數(shù)量
Pulsar 可以支持百萬(wàn)級(jí)別 Topic 數(shù)量的擴(kuò)展,同時(shí)還能一直保持良好的性能。Topic 的伸縮性取決于它的內(nèi)部組織和存儲(chǔ)方式。Pulsar 的數(shù)據(jù)保存在 bookie (BookKeeper 服務(wù)器)上,處于寫狀態(tài)的不同 Topic 的消息,在內(nèi)存中排序,最終聚合保存到大文件中,在 Bookie 中需要更少的文件句柄。另一方面 Bookie 的 IO 更少依賴于文件系統(tǒng)的 Pagecache,Pulsar 也因此能夠支持大量的主題。

消費(fèi)模型
Pulsar 支持三種消費(fèi)模型:Exclusive、Shared 和Failover。
個(gè)推基于 Apache Pulsar 的優(yōu)先級(jí)隊(duì)列方案
Exclusive (獨(dú)享):一個(gè) Topic 只能被一個(gè)消費(fèi)者消費(fèi)。Pulsar 默認(rèn)使用這種模式。

Shared(共享):共享模式,多個(gè)消費(fèi)者可以連接到同一個(gè) Topic,消息依次分發(fā)給消費(fèi)者。當(dāng)一個(gè)消費(fèi)者宕機(jī)或者主動(dòng)斷開(kāi)連接時(shí),那么分發(fā)給這個(gè)消費(fèi)者的未確認(rèn)(ack)的消息會(huì)得到重新調(diào)度,分發(fā)給其他消費(fèi)者。

Failover (災(zāi)備):一個(gè)訂閱同時(shí)只有一個(gè)消費(fèi)者,可以有多個(gè)備份消費(fèi)者。一旦主消費(fèi)者故障,則備份消費(fèi)者接管。不會(huì)出現(xiàn)同時(shí)有兩個(gè)活躍的消費(fèi)者。

Exclusive和Failover訂閱,僅允許一個(gè)消費(fèi)者來(lái)使用和消費(fèi)每個(gè)訂閱的Topic。這兩種模式都按 Topic 分區(qū)順序使用消息。它們最適用于需要嚴(yán)格消息順序的流(Stream)用例。

Shared 允許每個(gè)主題分區(qū)有多個(gè)消費(fèi)者。同一個(gè)訂閱中的每個(gè)消費(fèi)者僅接收Topic分區(qū)的一部分消息。Shared最適用于不需要保證消息順序隊(duì)列(Queue)的使用模式,并且可以按照需要任意擴(kuò)展消費(fèi)者的數(shù)量。

存儲(chǔ)
Pulsar 引入了 Apache BookKeeper 作為存儲(chǔ)層,BookKeeper 是一個(gè)專門為實(shí)時(shí)系統(tǒng)優(yōu)化過(guò)的分布式存儲(chǔ)系統(tǒng),具有可擴(kuò)展、高可用、低延遲等特性。具體介紹,請(qǐng)參考 BookKeeper官網(wǎng)。

Segment
BookKeeper以 Segment (在 BookKeeper 內(nèi)部被稱作 ledger) 作為存儲(chǔ)的基本單元。從 Segment 到消息粒度,都會(huì)均勻分散到 BookKeeper 的集群中。這種機(jī)制保證了數(shù)據(jù)和服務(wù)均勻分散在 BookKeeper 集群中。

Pulsar 和 Kafka 都是基于 partition 的邏輯概念來(lái)做 Topic 存儲(chǔ)的。最根本的不同是,Kafka 的物理存儲(chǔ)是以 partition 為單位的,每個(gè) partition 必須作為一個(gè)整體(一個(gè)目錄)存儲(chǔ)在某個(gè) broker 上。 而 Pulsar 的 partition 是以 segment 作為物理存儲(chǔ)的單位,每個(gè) partition 會(huì)再被打散并均勻分散到多個(gè) bookie 節(jié)點(diǎn)中。

這樣的直接影響是,Kafka 的 partition 的大小,受制于單臺(tái) broker 的存儲(chǔ);而 Pulsar 的 partition 則可以利用整個(gè)集群的存儲(chǔ)容量。
個(gè)推基于 Apache Pulsar 的優(yōu)先級(jí)隊(duì)列方案

擴(kuò)容
當(dāng) partition 的容量達(dá)到上限后,需要擴(kuò)容的時(shí)候,如果現(xiàn)有的單臺(tái)機(jī)器不能滿足,Kafka 可能需要添加新的存儲(chǔ)節(jié)點(diǎn),并將 partition 的數(shù)據(jù)在節(jié)點(diǎn)之間搬移達(dá)到 rebalance 的狀態(tài)。

而 Pulsar 只需添加新的 Bookie 存儲(chǔ)節(jié)點(diǎn)即可。新加入的節(jié)點(diǎn)由于剩余空間大,會(huì)被優(yōu)先使用,接收更多的新數(shù)據(jù);整個(gè)擴(kuò)容過(guò)程不涉及任何已有數(shù)據(jù)的拷貝和搬移。
個(gè)推基于 Apache Pulsar 的優(yōu)先級(jí)隊(duì)列方案

Broker 故障
Pulsar 在單個(gè)節(jié)點(diǎn)失敗時(shí)也會(huì)體現(xiàn)同樣的優(yōu)勢(shì)。如果 Pulsar 的某個(gè)服務(wù)節(jié)點(diǎn) broker 失效,由于 broker 是無(wú)狀態(tài)的,其他的 broker 可以很快接管 Topic,不會(huì)涉及 Topic 數(shù)據(jù)的拷貝;如果存儲(chǔ)節(jié)點(diǎn) Bookie 失效,在集群后臺(tái)中,其他的 Bookie 會(huì)從多個(gè) Bookie 節(jié)點(diǎn)中并發(fā)讀取數(shù)據(jù),并對(duì)失效節(jié)點(diǎn)的數(shù)據(jù)自動(dòng)進(jìn)行恢復(fù),對(duì)前端服務(wù)不會(huì)造成影響。
個(gè)推基于 Apache Pulsar 的優(yōu)先級(jí)隊(duì)列方案

Bookie 故障
Apache BookKeeper 中的副本修復(fù)是 Segment (甚至是 Entry)級(jí)別的多對(duì)多快速修復(fù)。這種方式只會(huì)復(fù)制必須的數(shù)據(jù),這比重新復(fù)制整個(gè)主題分區(qū)要精細(xì)。如下圖所示,當(dāng)錯(cuò)誤發(fā)生時(shí), Apache BookKeeper 可以從 bookie 3 和 bookie 4 中讀取 Segment 4 中的消息,并在 bookie 1 處修復(fù) Segment 4。所有的副本修復(fù)都在后臺(tái)進(jìn)行,對(duì) Broker 和應(yīng)用透明。

當(dāng)某個(gè) Bookie 節(jié)點(diǎn)出錯(cuò)時(shí),BookKeeper會(huì)自動(dòng)添加可用的新 Bookie 來(lái)替換失敗的 Bookie,出錯(cuò)的 Bookie 中的數(shù)據(jù)在后臺(tái)恢復(fù),所有 Broker 的寫入不會(huì)被打斷,而且不會(huì)犧牲主題分區(qū)的可用性。
個(gè)推基于 Apache Pulsar 的優(yōu)先級(jí)隊(duì)列方案

四、基于 Pulsar 的優(yōu)先級(jí)隊(duì)列方案

在設(shè)計(jì)思路上,Pulsar 方案和 Kafka 方案并沒(méi)有多大區(qū)別。但在新方案中,個(gè)推技術(shù)團(tuán)隊(duì)借助 Pulsar 的特性,解決了 Kafka 方案中存在的問(wèn)題。

  1. 根據(jù) task 動(dòng)態(tài)生成 Topic,保證了后進(jìn)入的 task 不會(huì)因?yàn)槠渌?task 消息堆積而造成等待情況。
  2. 中高優(yōu)先級(jí) task 都獨(dú)享一個(gè) Topic,低優(yōu)先級(jí) task 共享 n 個(gè) Topic。
  3. 相同優(yōu)先級(jí)內(nèi),各個(gè) task 輪詢讀取消息,配額滿后流轉(zhuǎn)至下一個(gè)優(yōu)先級(jí)。
  4. 相同優(yōu)先級(jí)內(nèi), 各個(gè) task 可動(dòng)態(tài)調(diào)整 quota, 在相同機(jī)會(huì)內(nèi),可讀取更多消息。
  5. 利用 Shared 模式, 可以動(dòng)態(tài)添加刪除 consumer,且不會(huì)觸發(fā) Rebalance 情況。
  6. 利用 BookKeeper 特性,可以更靈活的添加存儲(chǔ)資源。
    個(gè)推基于 Apache Pulsar 的優(yōu)先級(jí)隊(duì)列方案

五、Pulsar 其他實(shí)踐

  1. 不同 subscription 之間相對(duì)獨(dú)立,如果想要重復(fù)消費(fèi)某個(gè) Topic 的消息,需要使用不同的 subscriptionName 訂閱;但是一直增加新的 subscriptionName,backlog 會(huì)不斷累積。
  2. 如果 Topic 無(wú)人訂閱,發(fā)給它的消息默認(rèn)會(huì)被刪除。因此如果 producer 先發(fā)送,consumer 后接收,一定要確保 producer 發(fā)送之前,Topic 有 subscription 存在(哪怕 subscribe 之后 close 掉),否則這段時(shí)間發(fā)送的消息會(huì)導(dǎo)致無(wú)人處理。
  3. 如果既沒(méi)有人發(fā)送消息,又沒(méi)有人訂閱消息,一段時(shí)間后 Topic 會(huì)自動(dòng)刪除。
  4. Pulsar 的 TTL 等設(shè)置,是針對(duì)整個(gè) namespace 起效的,無(wú)法針對(duì)單個(gè) Topic。
  5. Pulsar 的鍵都建立在 zookeeper 的根目錄上,在初始化時(shí)建議增加總節(jié)點(diǎn)名。
  6. 目前 Pulsar 的 java api 設(shè)計(jì),消息默認(rèn)需要顯式確認(rèn),這一點(diǎn)跟 Kafka 不一樣。
  7. Pulsar dashboard 上的 storage size 和 prometheus 上的 storage size (包含副本大小)概念不一樣。
  8. dbStorage_rocksDB_blockCacheSize 設(shè)置的足夠大;當(dāng)消息體量大,出現(xiàn)backlog 大量堆積時(shí), 使用默認(rèn)大小(256M)會(huì)出現(xiàn)讀耗時(shí)過(guò)大情況,導(dǎo)致消費(fèi)變慢。
  9. 使用多 partition,提高吞吐。
  10. 在系統(tǒng)出現(xiàn)異常時(shí),主動(dòng)抓取 stats 和 stats-internal,里面有很多有用數(shù)據(jù)。
  11. 如果業(yè)務(wù)中會(huì)出現(xiàn)單 Topic 體量過(guò)大的情況,建議把 backlogQuotaDefaultLimitGB 設(shè)置的足夠大(默認(rèn)10G), 避免因?yàn)槟J(rèn)使用producer_request_hold 模式出現(xiàn) block producer 的情況;當(dāng)然可以根據(jù)實(shí)際業(yè)務(wù)選擇合適的 backlogQuotaDefaultRetentionPolicy。
  12. 根據(jù)實(shí)際業(yè)務(wù)場(chǎng)景主動(dòng)選擇 backlog quota。
  13. prometheus 內(nèi)如果發(fā)現(xiàn)讀耗時(shí)為空情況,可能是因?yàn)橹苯幼x取了緩存數(shù)據(jù);Pulsar 在讀取消息時(shí)會(huì)先讀取 write cache, 然后讀取 read cache;如果都沒(méi)有命中, 則會(huì)在 RocksDB 中讀取條目位子后,再?gòu)娜罩疚募凶x取該條目。
  14. 寫入消息時(shí), Pulsar 會(huì)同步寫入 journal 和 write cache;write cache 再異步寫入日志文件和 RocksDB; 所以有資源的話,建議 journal 盤使用SSD。

六、總結(jié)

現(xiàn)在, 個(gè)推針對(duì)優(yōu)先級(jí)中間件的改造方案已經(jīng)在部分現(xiàn)網(wǎng)業(yè)務(wù)中試運(yùn)行,對(duì)于 Pulsar 的穩(wěn)定性,我們還在持續(xù)關(guān)注中。
作為一個(gè)2016 年才開(kāi)源的項(xiàng)目,Pulsar 擁有非常多吸引人的特性,也彌補(bǔ)了其他競(jìng)品的短板,例如跨地域復(fù)制、多租戶、擴(kuò)展性、讀寫隔離等。盡管在業(yè)內(nèi)使用尚不廣泛, 但從現(xiàn)有的特性來(lái)說(shuō), Pulsar 表現(xiàn)出了取代 Kafka 的趨勢(shì)。在使用 Pulsar 過(guò)程中,我們也遇到了一些問(wèn)題, 在此特別感謝翟佳和郭斯杰(兩位均為 Stream Native 的核心工程師、開(kāi)源項(xiàng)目 Apache Pulsar 的 PMC 成員)給我們提供的支持和幫助。

參考文獻(xiàn):

[1] 比拼 Kafka, 大數(shù)據(jù)分析新秀Pulsar 到底好在哪(https://www.infoq.cn/article/1UaxFKWUhUKTY1t_5gPq)

[2] 開(kāi)源實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)Pulsar:一套搞定Kafka+Flink+DB(https://juejin.im/post/5af414365188256717765441)

新聞名稱:個(gè)推基于ApachePulsar的優(yōu)先級(jí)隊(duì)列方案
當(dāng)前URL:http://muchs.cn/article0/phddoo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供域名注冊(cè)、品牌網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)、自適應(yīng)網(wǎng)站網(wǎng)站策劃、搜索引擎優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都app開(kāi)發(fā)公司