RabbitMQ如何防止數(shù)據(jù)丟失

小編給大家分享一下RabbitMQ如何防止數(shù)據(jù)丟失,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

站在用戶的角度思考問題,與客戶深入溝通,找到嵐山網(wǎng)站設(shè)計(jì)與嵐山網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類型包括:網(wǎng)站制作、做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、域名注冊、網(wǎng)頁空間、企業(yè)郵箱。業(yè)務(wù)覆蓋嵐山地區(qū)。


 

一、分析數(shù)據(jù)丟失的原因

分析RabbitMQ消息丟失的情況,不妨先看看一條消息從生產(chǎn)者發(fā)送到消費(fèi)者消費(fèi)的過程:

RabbitMQ如何防止數(shù)據(jù)丟失  

可以看出,一條消息整個(gè)過程要經(jīng)歷兩次的網(wǎng)絡(luò)傳輸:從生產(chǎn)者發(fā)送到RabbitMQ服務(wù)器,從RabbitMQ服務(wù)器發(fā)送到消費(fèi)者

在消費(fèi)者未消費(fèi)前存儲在隊(duì)列(Queue)中。

所以可以知道,有三個(gè)場景下是會發(fā)生消息丟失的:

  • 存儲在隊(duì)列中,如果隊(duì)列沒有對消息持久化,RabbitMQ服務(wù)器宕機(jī)重啟會丟失數(shù)據(jù)。
  • 生產(chǎn)者發(fā)送消息到RabbitMQ服務(wù)器過程中,RabbitMQ服務(wù)器如果宕機(jī)停止服務(wù),消息會丟失。
  • 消費(fèi)者從RabbitMQ服務(wù)器獲取隊(duì)列中存儲的數(shù)據(jù)消費(fèi),但是消費(fèi)者程序出錯(cuò)或者宕機(jī)而沒有正確消費(fèi),導(dǎo)致數(shù)據(jù)丟失。

針對以上三種場景,RabbitMQ提供了三種解決的方式,分別是消息持久化,confirm機(jī)制,ACK事務(wù)機(jī)制。

RabbitMQ如何防止數(shù)據(jù)丟失  
 

二、消息持久化

RabbitMQ是支持消息持久化的,消息持久化需要設(shè)置:Exchange為持久化和Queue持久化,這樣當(dāng)消息發(fā)送到RabbitMQ服務(wù)器時(shí),消息就會持久化。

首先看Exchange交換機(jī)的類圖:

RabbitMQ如何防止數(shù)據(jù)丟失  

看這個(gè)類圖其實(shí)是要說明上一篇文章介紹的四種交換機(jī)都是AbstractExchange抽象類的子類,所以根據(jù)java的特性,創(chuàng)建子類的實(shí)例會先調(diào)用父類的構(gòu)造器,父類也就是AbstractExchange的構(gòu)造器是怎么樣的呢?

RabbitMQ如何防止數(shù)據(jù)丟失  

從上面的注釋可以看到durable參數(shù)表示是否持久化。默認(rèn)是持久化(true)。創(chuàng)建持久化的Exchange可以這樣寫:

 @Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        //Direct交換機(jī)
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
    }
 

接著是Queue隊(duì)列,我們先看看Queue的構(gòu)造器是怎么樣的:

RabbitMQ如何防止數(shù)據(jù)丟失  

也是通過durable參數(shù)設(shè)置是否持久化,默認(rèn)是true。所以創(chuàng)建時(shí)可以不指定:

 @Bean
    public Queue fanoutExchangeQueueA() {
     //只需要指定名稱,默認(rèn)是持久化的
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
    }
 

這就完成了消息持久化的設(shè)置,接下來啟動項(xiàng)目,發(fā)送幾條消息,我們可以看到:

RabbitMQ如何防止數(shù)據(jù)丟失怎么證明是已經(jīng)持久化了呢,實(shí)際上可以找到對應(yīng)的文件:RabbitMQ如何防止數(shù)據(jù)丟失找到對應(yīng)磁盤中的目錄:RabbitMQ如何防止數(shù)據(jù)丟失消息持久化可以防止消息在RabbitMQ Server中不會因?yàn)殄礄C(jī)重啟而丟失。

 

三、消息確認(rèn)機(jī)制

 

3.1 confirm機(jī)制

在生產(chǎn)者發(fā)送到RabbitMQ Server時(shí)有可能因?yàn)榫W(wǎng)絡(luò)問題導(dǎo)致投遞失敗,從而丟失數(shù)據(jù)。我們可以使用confirm模式防止數(shù)據(jù)丟失。工作流程是怎么樣的呢,看以下圖解:RabbitMQ如何防止數(shù)據(jù)丟失從上圖中可以看到是通過兩個(gè)回調(diào)函數(shù)**confirm()、returnedMessage()**進(jìn)行通知。

一條消息從生產(chǎn)者發(fā)送到RabbitMQ,首先會發(fā)送到Exchange,對應(yīng)回調(diào)函數(shù)confirm()。第二步從Exchange路由分配到Queue中,對應(yīng)回調(diào)函數(shù)則是returnedMessage()

代碼怎么實(shí)現(xiàn)呢,請看演示:

首先在application.yml配置文件中加上如下配置:

spring:
  rabbitmq:
    publisher-confirms: true
#    publisher-returns: true
    template:
      mandatory: true
# publisher-confirms:設(shè)置為true時(shí)。當(dāng)消息投遞到Exchange后,會回調(diào)confirm()方法進(jìn)行通知生產(chǎn)者
# publisher-returns:設(shè)置為true時(shí)。當(dāng)消息匹配到Queue并且失敗時(shí),會通過回調(diào)returnedMessage()方法返回消息
# spring.rabbitmq.template.mandatory: 設(shè)置為true時(shí)。指定消息在沒有被隊(duì)列接收時(shí)會通過回調(diào)returnedMessage()方法退回。
 

有個(gè)小細(xì)節(jié),publisher-returns和mandatory如果都設(shè)置的話,優(yōu)先級是以mandatory優(yōu)先。可以看源碼:RabbitMQ如何防止數(shù)據(jù)丟失接著我們需要定義回調(diào)方法:

@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);

    /**
     * 監(jiān)聽消息是否到達(dá)Exchange
     *
     * @param correlationData 包含消息的唯一標(biāo)識的對象
     * @param ack             true 標(biāo)識 ack,false 標(biāo)識 nack
     * @param cause           nack 投遞失敗的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("消息投遞成功~消息Id:{}", correlationData.getId());
        } else {
            logger.error("消息投遞失敗,Id:{},錯(cuò)誤提示:{}", correlationData.getId(), cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.info("消息沒有路由到隊(duì)列,獲得返回的消息");
        Map map = byteToObject(message.getBody(), Map.class);
        logger.info("message body: {}", map == null ? "" : map.toString());
        logger.info("replyCode: {}", replyCode);
        logger.info("replyText: {}", replyText);
        logger.info("exchange: {}", exchange);
        logger.info("routingKey: {}", exchange);
        logger.info("------------> end <------------");
    }

    @SuppressWarnings("unchecked")
    private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
        T t;
        try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
             ObjectInputStream ois = new ObjectInputStream(bis)) {
            t = (T) ois.readObject();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
        return t;
    }
}
 

我這里就簡單地打印回調(diào)方法返回的消息,在實(shí)際項(xiàng)目中,可以把返回的消息存儲到日志表中,使用定時(shí)任務(wù)進(jìn)行進(jìn)一步的處理。

我這里是使用RabbitTemplate進(jìn)行發(fā)送,所以在Service層的RabbitTemplate需要設(shè)置一下:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
 @Resource
    private RabbitmqConfirmCallback rabbitmqConfirmCallback;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        //指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
        //指定 ReturnCallback
        rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
    }
    
    @Override
    public String sendMsg(String msg) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
    
 private Map<String, Object> getMessage(String msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        CorrelationData correlationData = new CorrelationData(msgId);
        String sendTime = sdf.format(new Date());
        Map<String, Object> map = new HashMap<>();
        map.put("msgId", msgId);
        map.put("sendTime", sendTime);
        map.put("msg", msg);
        map.put("correlationData", correlationData);
        return map;
    }
}
 

大功告成!接下來我們進(jìn)行測試,發(fā)送一條消息,我們可以控制臺:RabbitMQ如何防止數(shù)據(jù)丟失假設(shè)發(fā)送一條信息沒有路由匹配到隊(duì)列,可以看到如下信息:RabbitMQ如何防止數(shù)據(jù)丟失這就是confirm模式。它的作用是為了保障生產(chǎn)者投遞消息到RabbitMQ不會出現(xiàn)消息丟失

 

3.2 事務(wù)機(jī)制(ACK)

最開始的那張圖已經(jīng)講過,消費(fèi)者從隊(duì)列中獲取到消息后,會直接確認(rèn)簽收,假設(shè)消費(fèi)者宕機(jī)或者程序出現(xiàn)異常,數(shù)據(jù)沒有正常消費(fèi),這種情況就會出現(xiàn)數(shù)據(jù)丟失。

所以關(guān)鍵在于把自動簽收改成手動簽收,正常消費(fèi)則返回確認(rèn)簽收,如果出現(xiàn)異常,則返回拒絕簽收重回隊(duì)列。RabbitMQ如何防止數(shù)據(jù)丟失代碼怎么實(shí)現(xiàn)呢,請看演示:

首先在消費(fèi)者的application.yml文件中設(shè)置事務(wù)提交為manual手動模式:

spring:
  rabbitmq:
    listener:
      simple:
  acknowledge-mode: manual # 手動ack模式
        concurrency: 1 # 最少消費(fèi)者數(shù)量
        max-concurrency: 10 # 最大消費(fèi)者數(shù)量
 

然后編寫消費(fèi)者的監(jiān)聽器:

@Component
public class RabbitDemoConsumer {

    enum Action {
        //處理成功
        SUCCESS,
        //可以重試的錯(cuò)誤,消息重回隊(duì)列
        RETRY,
        //無需重試的錯(cuò)誤,拒絕消息,并從隊(duì)列中刪除
        REJECT
    }

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
    public void process(String msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            System.out.println("消費(fèi)者RabbitDemoConsumer從RabbitMQ服務(wù)端消費(fèi)消息:" + msg);
            if ("bad".equals(msg)) {
                throw new IllegalArgumentException("測試:拋出可重回隊(duì)列的異常");
            }
            if ("error".equals(msg)) {
                throw new Exception("測試:拋出無需重回隊(duì)列的異常");
            }
        } catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            //根據(jù)異常的類型判斷,設(shè)置action是可重試的,還是無需重試的
            action = Action.RETRY;
        } catch (Exception e2) {
            //打印異常
            e2.printStackTrace();
            //根據(jù)異常的類型判斷,設(shè)置action是可重試的,還是無需重試的
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple 表示是否批量處理。true表示批量ack處理小于tag的所有消息。false則處理當(dāng)前消息
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack,拒絕策略,消息重回隊(duì)列
                    channel.basicNack(tag, false, true);
                } else {
                    //Nack,拒絕策略,并且從隊(duì)列中刪除
                    channel.basicNack(tag, false, false);
                }
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
 

解釋一下上面的代碼,如果沒有異常,則手動確認(rèn)回復(fù)RabbitMQ服務(wù)端basicAck(消費(fèi)成功)。

如果拋出某些可以重回隊(duì)列的異常,我們就回復(fù)basicNack并且設(shè)置重回隊(duì)列。

如果是拋出不可重回隊(duì)列的異常,就回復(fù)basicNack并且設(shè)置從RabbitMQ的隊(duì)列中刪除。

接下來進(jìn)行測試,發(fā)送一條普通的消息"hello":RabbitMQ如何防止數(shù)據(jù)丟失解釋一下ack返回的三個(gè)方法的意思。

①成功確認(rèn)

void basicAck(long deliveryTag, boolean multiple) throws IOException;
 

消費(fèi)者成功處理后調(diào)用此方法對消息進(jìn)行確認(rèn)。

  • deliveryTag:該消息的index
  • multiple:是否批量.。true:將一次性ack所有小于deliveryTag的消息。

②失敗確認(rèn)

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
 
  • deliveryTag:該消息的index。
  • multiple:是否批量。true:將一次性拒絕所有小于deliveryTag的消息。
  • requeue:被拒絕的是否重新入隊(duì)列。

③失敗確認(rèn)

void basicReject(long deliveryTag, boolean requeue) throws IOException;
 
  • deliveryTag:該消息的index。
  • requeue:被拒絕的是否重新入隊(duì)列。

basicNack()和basicReject()的區(qū)別在于:basicNack()可以批量拒絕,basicReject()一次只能拒接一條消息。

 

四、遇到的坑

 

4.1 啟用nack機(jī)制后,導(dǎo)致的死循環(huán)

上面的代碼我故意寫了一個(gè)bug。測試發(fā)送一條"bad",然后會拋出重回隊(duì)列的異常。這就有個(gè)問題:重回隊(duì)列后消費(fèi)者又消費(fèi),消費(fèi)拋出異常又重回隊(duì)列,就造成了死循環(huán)。RabbitMQ如何防止數(shù)據(jù)丟失那怎么避免這種情況呢?

既然nack會造成死循環(huán)的話,我提供的一個(gè)思路是不使用basicNack(),把拋出異常的消息落庫到一張表中,記錄拋出的異常,消息體,消息Id。通過定時(shí)任務(wù)去處理。

如果你有什么好的解決方案,也可以留言討論~

 

4.2 double ack

有的時(shí)候比較粗心,不小心開啟了自動Ack模式,又手動回復(fù)了Ack。那就會報(bào)這個(gè)錯(cuò)誤:

消費(fèi)者RabbitDemoConsumer從RabbitMQ服務(wù)端消費(fèi)消息:java技術(shù)愛好者
2020-08-02 22:52:42.148 ERROR 4880 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-08-02 22:52:43.102  INFO 4880 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@f4a3a8d: tags=[{amq.ctag-8MJeQ7el_PNbVJxGOOw7Rw=rabbitmq.demo.topic}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,5), conn: Proxy@782a1679 Shared Rabbit Connection: SimpleConnection@67c5b175 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56938], acknowledgeMode=AUTO local queue size=0
 

出現(xiàn)這個(gè)錯(cuò)誤,可以檢查一下yml文件是否添加了以下配置:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 10
 

如果上面這個(gè)配置已經(jīng)添加了,還是報(bào)錯(cuò),有可能你使用@Configuration配置了SimpleRabbitListenerContainerFactory,根據(jù)SpringBoot的特性,代碼優(yōu)于配置,代碼的配置覆蓋了yml的配置,并且忘記設(shè)置手動manual模式

@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //設(shè)置手動ack模式
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
 

如果你還是有報(bào)錯(cuò),那可能是寫錯(cuò)地方了,寫在生產(chǎn)者的項(xiàng)目了。以上的配置應(yīng)該配置在消費(fèi)者的項(xiàng)目。因?yàn)閍ck模式是針對消費(fèi)者而言的。我就是寫錯(cuò)了,寫在生產(chǎn)者,折騰了幾個(gè)小時(shí),淚目~

 

4.3 性能問題

其實(shí)手動ACK相對于自動ACK肯定是會慢很多,我在網(wǎng)上查了一些資料,性能相差大概有10倍。所以一般在實(shí)際應(yīng)用中不太建議開手動ACK模式。不過也不是絕對不可以開,具體情況具體分析,看并發(fā)量,還有數(shù)據(jù)的重要性等等。

所以在實(shí)際項(xiàng)目中還需要權(quán)衡一下并發(fā)量和數(shù)據(jù)的重要性,再決定具體的方案

 

4.4 啟用手動ack模式,如果沒有及時(shí)回復(fù),會造成隊(duì)列異常

如果開啟了手動ACK模式,但是由于代碼有bug的原因,沒有回復(fù)RabbitMQ服務(wù)端,那么這條消息就會放到Unacked狀態(tài)的消息堆里,只有等到消費(fèi)者的連接斷開才會轉(zhuǎn)到Ready消息。如果消費(fèi)者一直沒有斷開連接,那Unacked的消息就會越來越多,占用內(nèi)存就越來越大,最后就會出現(xiàn)異常。

這個(gè)問題,我沒法用我的電腦演示,我的電腦太卡了。

 

看完了這篇文章,相信你對“RabbitMQ如何防止數(shù)據(jù)丟失”有了一定的了解,如果想了解更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!

本文標(biāo)題:RabbitMQ如何防止數(shù)據(jù)丟失
文章源于:http://muchs.cn/article18/ghghgp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、外貿(mào)建站、全網(wǎng)營銷推廣網(wǎng)頁設(shè)計(jì)公司、軟件開發(fā)、面包屑導(dǎo)航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)