RocketMQ源碼閱讀-創(chuàng)新互聯(lián)

RocketMQ 是一款開源的消息中間件,采用Java實(shí)現(xiàn),設(shè)計思想來自于Kafka(Scala實(shí)現(xiàn)),在具體設(shè)計時體現(xiàn)了自己的選擇和需求,具體差別可以看RocketMQ與Kafka對比。接下來是自己閱讀源碼的一些探索。

 RocketMQ源碼閱讀

    RocketMQ的整體架構(gòu)如下,可以看到各個組件充當(dāng)?shù)慕巧?,Name Server 負(fù)責(zé)維護(hù)一些全局的路由信息:當(dāng)前有哪些broker,每個Topic在哪個broker上; Broker具體處理消息的存儲和服務(wù);生產(chǎn)者和消費(fèi)者是消息的源頭和歸宿。

 RocketMQ源碼閱讀

一、Producer 發(fā)送消息

    Producer發(fā)送消息是如何得知發(fā)到哪個broker的 ? 每個應(yīng)用在收發(fā)消息之前,一般會調(diào)用一次producer.start()/consumer.start()做一些初始化工作,其中包括:創(chuàng)建需要的實(shí)例對象,如MQClientInstance;設(shè)置定時任務(wù),如從Nameserver中定時更新本地的Topic route info,發(fā)送心跳信息到所有的 broker,動態(tài)調(diào)整線程池的大小,把當(dāng)前producer加入到指定的組中等等??蛻舳藭彺媛酚尚畔opicPublishInfo, 同時定期從NameServer取Topic路由信息,每個Broker與NameServer集群中的所有節(jié)點(diǎn)建立長連接,定時注冊Topic信息到所有的NameServer。Producer在發(fā)送消息的時候會去查詢本地的topicPublishInfoTable(一個ConcurrentHashMap),如果沒有命中的話就會詢問NameServer得到路由信息 (RequestCode=GET_ROUTEINTO_BY_TOPIC) 如果nameserver中也沒有查詢到(表示該主題的消息第一次發(fā)送),那么將會發(fā)送一個default的topic進(jìn)行路由查詢。

具體過程如下圖所示:
    Producer 在得到了具體的通信地址后,發(fā)送過程就顯而易見了。通過代碼可以看到在選擇消息隊列進(jìn)行發(fā)送時采用隨機(jī)方式,同時和上一次發(fā)送的broker保持不同,防止熱點(diǎn)。

 RocketMQ源碼閱讀

二、Broker處理來自Producer的消息

    每個producer在發(fā)送消息的時候都和對應(yīng)的Broker建立了長連接,此時broker已經(jīng)準(zhǔn)備好接收Message,Broker的SendMessageProcessor.sendMessage處理消息的存儲,具體過程如下。接收到消息后,會先寫入Commit Log文件(順序?qū)懀瑢憹M了會新建一個新的文件),然后更新Consume queue文件(存儲如何由topic定位到具體的消息)。

 RocketMQ源碼閱讀

三、RocketMQ 存儲特點(diǎn)

    RocketMQ的消息采用順序?qū)懙絚ommitlog文件,然后利用consume queue文件作為索引,如圖。RocketMQ采用零拷貝mmap+write的方式來回應(yīng)Consumer的請求,RocketMQ宣稱大部分請求都會在Page Cache層得到滿足,所以消息過多不會因?yàn)榇疟P讀使得性能下降,這里自己的理解是,在64bit機(jī)器下,虛存地址空間(vm_area_struct)不是問題,所以相關(guān)的文件都會被映射到內(nèi)存中(有定期刪除文件的操作),即使此刻不在內(nèi)存,操作系統(tǒng)也會因?yàn)槿表摦惓_M(jìn)行換入,雖然地址空間不是問題,但是一個進(jìn)程映射文件的個數(shù)(/proc/sys/vm/max_map_count)是有限的,所以可能在這里發(fā)生OOM。

 RocketMQ源碼閱讀

通過Broker中的存儲目錄(默認(rèn)路徑是 $HOME/store)也能看到存儲的邏輯視圖:

 RocketMQ源碼閱讀

四、順序消息是如何保證的?

    需要業(yè)務(wù)層自己決定哪些消息應(yīng)該順序到達(dá),然后發(fā)送的時候通過規(guī)則(hash)映射到同一個隊列,因?yàn)闆]有誰比業(yè)務(wù)自己更加知道關(guān)于消息順序的特點(diǎn)。這樣的順序是相對順序,局部順序,因?yàn)榘l(fā)送方只保證把這些消息順序的發(fā)送到broker上的同一隊列,但是不保證其他Producer也會發(fā)送消息到那個隊列,所以需要Consumer在拉到消息后做一些過濾。

五、RocketMQ 刷盤實(shí)現(xiàn)

    Broker 在消息的存取時直接操作的是內(nèi)存(內(nèi)存映射文件),這可以提供系統(tǒng)的吞吐量,但是無法避免機(jī)器掉電時數(shù)據(jù)丟失,所以需要持久化到磁盤中。刷盤的最終實(shí)現(xiàn)都是使用NIO中的 MappedByteBuffer.force() 將映射區(qū)的數(shù)據(jù)寫入到磁盤,如果是同步刷盤的話,在Broker把消息寫到CommitLog映射區(qū)后,就會等待寫入完成。異步而言,只是喚醒對應(yīng)的線程,不保證執(zhí)行的時機(jī),流程如圖所示,更多細(xì)節(jié)可以參考。

 RocketMQ源碼閱讀

六、消息過濾

    類似于重復(fù)數(shù)據(jù)刪除技術(shù)(Data Deduplication),可以在源端做,也可以在目的端實(shí)現(xiàn),就是網(wǎng)絡(luò)和存儲的權(quán)衡,如果在Broker端做消息過濾就需要逐一比對consume queue 的 tagsCode 字段(hashcode),如果符合則傳輸給消費(fèi)者,因?yàn)槭?hashcode,所以存在誤判,需要在 Consumer 接收到消息后進(jìn)行字符串級別的過濾,確保準(zhǔn)確性。

小結(jié)

    這次代碼閱讀主要著眼于消息的發(fā)送過程和Broker上的存儲,其他方面的細(xì)節(jié)有待深入。

成都創(chuàng)新互聯(lián)長期為上千客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為烏拉特后企業(yè)提供專業(yè)的網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè),烏拉特后網(wǎng)站改版等技術(shù)服務(wù)。擁有十年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。

本文標(biāo)題:RocketMQ源碼閱讀-創(chuàng)新互聯(lián)
分享路徑:http://muchs.cn/article46/cecdhg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計商城網(wǎng)站移動網(wǎng)站建設(shè)、小程序開發(fā)、網(wǎng)站設(shè)計、建站公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)