在高并發(fā)的情況下如何保證消息的可靠性?消息丟失如何解決?

本文分三部分說明

  1. mq消息丟失場景有哪些?

    創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比二連浩特網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式二連浩特網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋二連浩特地區(qū)。費用合理售后完善,十年實體公司更值得信賴。

  2. 如何避免消息丟失?

  3. 大廠如何解決這些問題的?

mq消息丟失場景有哪些?

首先我們看下消息周期投遞過程:

在高并發(fā)的情況下如何保證消息的可靠性?消息丟失如何解決?

解決RabbitMQ消息丟失問題和保證消息可靠性

我們把該圖分三部分,左中右三部分,每部分都會導(dǎo)致消息丟失情況:

1.生產(chǎn)者生產(chǎn)消息到RabbitMQ-Server 消息丟失場景

  1. 外界環(huán)境問題導(dǎo)致:發(fā)生網(wǎng)絡(luò)丟包、網(wǎng)絡(luò)故障等造成消息丟失

  2. 代碼層面,配置層面,考慮不全導(dǎo)致消息丟失

發(fā)送端使用Confirm模式,方案不夠嚴謹,比如MQ Server接收消息失敗發(fā)送 nack給發(fā)送端后,發(fā)送端監(jiān)聽失敗或者沒做任何事情,消息丟失的情況;

再比如發(fā)送消息到exchange后,發(fā)下路由和queue沒有綁定,消息會存在丟失情況,下面會講到具體的例子。

2.RabbitMQ-Server中存儲的消息丟失

  1. 消息沒有持久化導(dǎo)致丟失

  2. 單節(jié)點或者集群模式?jīng)]有鏡像模式消息丟失

  3. 個別磁盤意外損害導(dǎo)致消息同步失敗

  4. 機房被炸

3.RabbitMQ-Server到消費者消息丟失

  1. 消費者接收到相關(guān)消息之后,還沒來得及處理就宕機了,消息丟失

如何避免消息丟失?

下面也是從三個方面介紹:

  1. 生產(chǎn)者生產(chǎn)消息到RabbitMQ-Server 可靠性保證

  2. RabbitMQ-Server中存儲的消息如何保證

  3. RabbitMQ-Server到消費者消息如何不丟

1. 生產(chǎn)者生產(chǎn)消息到RabbitMQ-Server可靠性保證

這個過程,消息可能會丟,比如發(fā)生網(wǎng)絡(luò)丟包、網(wǎng)絡(luò)故障等造成消息丟失,一般情況下如果不采取措施,生產(chǎn)者無法感知消息是否已經(jīng)正確無誤的發(fā)送到exchange中,如果生產(chǎn)者能感知到的話,它可以進行進一步的處理動作,比如重新投遞相關(guān)消息以確保消息的可靠性。

1.1 別擔心,有一種方案可以解決:就是 AMQP協(xié)議提供的一個事務(wù)機制

RabbitMQ客戶端中Channel 接口提供了幾個事務(wù)機制相關(guān)的方法: channel.txSelect channel.txCommit channel.txRollback 源碼截圖如下:com.rabbitmq.client 包中public interface Channel extendsShutdownNotifier {}接口

在高并發(fā)的情況下如何保證消息的可靠性?消息丟失如何解決?

在生產(chǎn)者發(fā)送消息之前,通過channel.txSelect開啟一個事務(wù),接著發(fā)送消息, 如果消息投遞server失敗,進行事務(wù)回滾channel.txRollback,然后重新發(fā)送, 如果server收到消息,就提交事務(wù)channel.txCommit但是,很少有人這么干,因為這是同步操作,一條消息發(fā)送之后會使發(fā)送端阻塞,以等待RabbitMQ-Server的回應(yīng),之后才能繼續(xù)發(fā)送下一條消息,生產(chǎn)者生產(chǎn)消息的吞吐量和性能都會大大降低。

1.2 不過幸運的是RabbitMQ提供了一個改進方案,即發(fā)送方確認機制(publisher confirm)

首先生產(chǎn)者通過調(diào)用channel.confirmSelect方法將信道設(shè)置為confirm模式,一旦信道進入confirm模式,所有在該信道上面發(fā)布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,RabbitMQ就會發(fā)送一個確認(Basic.Ack)給生產(chǎn)者(包含消息的唯一deliveryTag和multiple參數(shù)),這就使得生產(chǎn)者知曉消息已經(jīng)正確到達了目的地了。

其實Confirm模式有三種方式實現(xiàn):

  1. 串行confirm模式:producer每發(fā)送一條消息后,調(diào)用waitForConfirms()方法,等待broker端confirm,如果服務(wù)器端返回false或者超時時間內(nèi)未返回,客戶端進行消息重傳。

  2. 批量confirm模式:producer每發(fā)送一批消息后,調(diào)用waitForConfirms()方法,等待broker端confirm。

  3. 異步confirm模式:提供一個回調(diào)方法,broker confirm了一條或者多條消息后producer端會回調(diào)這個方法。 我們分別來看看這三種confirm模式

串行confirm

在高并發(fā)的情況下如何保證消息的可靠性?消息丟失如何解決?

批量confirm模式

在高并發(fā)的情況下如何保證消息的可靠性?消息丟失如何解決?

上面代碼是簡單版本的,生產(chǎn)環(huán)境絕對不是循環(huán)發(fā)送的,而是根據(jù)業(yè)務(wù)情況, 各個客戶端程序需要定期(每x秒)或定量(每x條)或者兩者結(jié)合來pubish消息,然后等待服務(wù)器端confirm。相比普通confirm模式,批量可以極大提升confirm效率。

但是有沒有發(fā)現(xiàn)什么問題?

問題1: 批量發(fā)送的邏輯復(fù)雜話了。

問題2: 一旦出現(xiàn)confirm返回false或者超時的情況時,客戶端需要將這一批次的消息全部重發(fā),這會帶來明顯的重復(fù)消息數(shù)量,并且,當消息經(jīng)常丟失時,批量confirm性能應(yīng)該是不升反降的。

異步confirm模式

在高并發(fā)的情況下如何保證消息的可靠性?消息丟失如何解決?

異步模式需要自己多寫一部分復(fù)雜的代碼實現(xiàn),異步監(jiān)聽類,監(jiān)聽server端的通知消息,異步的好處性能會大幅度提升,發(fā)送完畢之后,可以繼續(xù)發(fā)送其他消息。 MQServer通知生產(chǎn)端ConfirmListener監(jiān)聽類:用戶可以繼承接口實現(xiàn)自己的實現(xiàn)類,處理消息確認機制,此處繼承類代碼省略,就是上面 ProxiedConfirmListener 類: 下面貼下要實現(xiàn)的接口:

在高并發(fā)的情況下如何保證消息的可靠性?消息丟失如何解決?

上面的接口很有意思,如果是你的話,怎么實現(xiàn)? 消息投遞前如何存儲消息,ack 和 nack 如何處理消息?

下面看下異步confirm的消息投遞流程:

在高并發(fā)的情況下如何保證消息的可靠性?消息丟失如何解決?

解決RabbitMQ消息丟失問題和保證消息可靠性

解釋下這張圖片:

channerl1 連續(xù)發(fā)類1,2,3條消息到RabbitMQ-Server,RabbitMQ-Server通知返回一條通知,里面包含回傳給生產(chǎn)者的確認消息中的deliveryTag包含了確認消息的序號,此外還有一個參數(shù)multiple=true,表示到這個序號之前的所有消息都已經(jīng)得到了處理。這樣客戶端和服務(wù)端通知的次數(shù)就減少類,提升類性能。

加點消息存儲和刪除邏輯

事務(wù)機制和publisher confirm機制確保的是消息能夠正確的發(fā)送至RabbitMQ,這里的“發(fā)送至RabbitMQ”的含義是指消息被正確的發(fā)往至RabbitMQ的交換器,如果此交換器沒有匹配的隊列的話,那么消息也將會丟失,怎么辦?

這里有兩個解決方案,

1. 使用mandatory 設(shè)置true

2. 利用備份交換機(alternate-exchange):實現(xiàn)沒有路由到隊列的消息

我們看下RabbitMQ客戶端代碼方法

Channel 類中 發(fā)布消息方法

void?basicPublish(String?exchange,?String?routingKey,?boolean?mandatory,?boolean?immediate,?BasicProperties?props,?byte[]?body)
?throws?IOException;

解釋下:basicPublish 方法中的,mandatory和immediate

/**
?*?當mandatory標志位設(shè)置為true時,如果exchange根據(jù)自身類型和消息routeKey無法找到一個符合條件的queue,?那么會調(diào)用basic.return方法將消息返回給生產(chǎn)者<br>
?*?當mandatory設(shè)置為false時,出現(xiàn)上述情形broker會直接將消息扔掉。
?*/
?@Setter(AccessLevel.PACKAGE)
?private?boolean?mandatory?=?false;
?/**
?*?當immediate標志位設(shè)置為true時,如果exchange在將消息路由到queue(s)時發(fā)現(xiàn)對于的queue上沒有消費者,?那么這條消息不會放入隊列中。
?當immediate標志位設(shè)置為false時,exchange路由的隊列沒有消費者時,該消息會通過basic.return方法返還給生產(chǎn)者。
?*?RabbitMQ?3.0版本開始去掉了對于immediate參數(shù)的支持,對此RabbitMQ官方解釋是:這個關(guān)鍵字違背了生產(chǎn)者和消費者之間解耦的特性,因為生產(chǎn)者不關(guān)心消息是否被消費者消費掉
?*/
?@Setter(AccessLevel.PACKAGE)
?private?boolean?immediate;

所以為了保證消息的可靠性,需要設(shè)置發(fā)送消息代碼邏輯。如果不單獨形式設(shè)置mandatory=false

使用mandatory 設(shè)置true的時候有個關(guān)鍵點要調(diào)整,生產(chǎn)者如何獲取到?jīng)]有被正確路由到合適隊列的消息呢?通過調(diào)用channel.addReturnListener來添加ReturnListener監(jiān)聽器實現(xiàn),只要發(fā)送的消息,沒有路由到具體的隊列,ReturnListener就會收到監(jiān)聽消息。

channel.addReturnListener(new?ReturnListener()?{
?public?void?handleReturn(int?replyCode,?String?replyText,?String?exchange,?String?routingKey,?AMQP
?.BasicProperties?basicProperties,?byte[]?body)?throws?IOException?{
?String?message?=?new?String(body);
?//進入該方法表示,沒路由到具體的隊列
?//監(jiān)聽到消息,可以重新投遞或者其它方案來提高消息的可靠性。
?System.out.println("Basic.Return返回的結(jié)果是:"?+?message);
?}
?});

此時有人問了,不想復(fù)雜化生產(chǎn)者的編程邏輯,又不想消息丟失,那么怎么辦? 還好RabbitMQ提供了一個叫做alternate-exchange東西,翻譯下就是備份交換器,這個干什么用呢?很簡單,它可以將未被路由的消息存儲在另一個exchange隊列中,再在需要的時候去處理這些消息。

那如何實現(xiàn)呢?

簡單一點可以通過webui管理后臺設(shè)置,當你新建一個exchange業(yè)務(wù)的時候,可以給它設(shè)置Arguments,這個參數(shù)就是 alternate-exchange,其實alternate-exchange就是一個普通的exchange,類型最好是fanout 方便管理

在高并發(fā)的情況下如何保證消息的可靠性?消息丟失如何解決?

解決RabbitMQ消息丟失問題和保證消息可靠性

當你發(fā)送消息到你自己的exchange時候,對應(yīng)key沒有路由到queue,就會自動轉(zhuǎn)移到alternate-exchange對應(yīng)的queue,起碼消息不會丟失。

下面一張圖看下投遞過程:

在高并發(fā)的情況下如何保證消息的可靠性?消息丟失如何解決?

解決RabbitMQ消息丟失問題和保證消息可靠性

那么有人有個疑問,上面介紹了,兩種方式處理,發(fā)送的消息無法路由到隊列的方案, 如果備份交換器和mandatory參數(shù)一起使用,會有什么效果?

答案是:mandatory參數(shù)無效

總結(jié)下上面內(nèi)容,主要如何保證消息從生產(chǎn)者到RabbitMQ Server 端可靠性

1. Transaction: 消息落盤,只能同步開啟、提交及回滾。

2. Confirm:消息進入緩沖區(qū),支持同步、異步、批量確認。

3. Transaction和publisher confirm機制兩者是互斥的

4. 一般在生產(chǎn)者這塊避免數(shù)據(jù)丟失,都是用 Confirm 機制的。

2.RabbitMQ-Server中存儲的消息如何保證

一般消息都是存內(nèi)存中的,如果消息沒有持久化硬盤,一天機器需要重啟,獲取意外停電,重啟機器后,消息全丟了,所以消息持久化是必備。

網(wǎng)站標題:在高并發(fā)的情況下如何保證消息的可靠性?消息丟失如何解決?
當前鏈接:http://muchs.cn/article26/pphdcg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供云服務(wù)器虛擬主機、企業(yè)建站、網(wǎng)站制作、網(wǎng)站內(nèi)鏈、域名注冊

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)