如何解決RocketMQ消息消費異常

這篇文章主要介紹“如何解決RocketMQ消息消費異?!?,在日常操作中,相信很多人在如何解決RocketMQ消息消費異常問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何解決RocketMQ消息消費異?!钡囊苫笥兴鶐椭〗酉聛?,請跟著小編一起來學習吧!

創(chuàng)新互聯(lián)服務項目包括東阿網(wǎng)站建設、東阿網(wǎng)站制作、東阿網(wǎng)頁制作以及東阿網(wǎng)絡營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關系等,向廣大中小型企業(yè)、政府機構等提供互聯(lián)網(wǎng)行業(yè)的解決方案,東阿網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務的客戶以成都為中心已經(jīng)輻射到東阿省份的部分城市,未來相信會繼續(xù)擴大服務區(qū)域并繼續(xù)獲得客戶的支持與信任!

發(fā)現(xiàn)問題:

開發(fā)中在項目重啟時會重復消費消息,但其實消息已經(jīng)消費過了。

查找問題:

1.RocketMq console查看,發(fā)現(xiàn)訂閱組消息延遲

如何解決RocketMQ消息消費異常

2.從消息看message Detail 對應的consumerGroup trackType為 not conume yet

如何解決RocketMQ消息消費異常

3.項目日志也沒有任何錯誤日志,然而根據(jù)相關業(yè)務查詢數(shù)據(jù)庫發(fā)現(xiàn)數(shù)據(jù)已經(jīng)處理完成
4.業(yè)務代碼斷點,沒有拋出任何異常,通過resend message也能正常消費
5.懷疑是不是rocketMq 更新offset的定時任務沒有啟動

然而通過源碼斷點MQClientInstance 定時任務正常,只是每次更新的offset都是原offet如何解決RocketMQ消息消費異常

6.看看是不是消費的時候出了問題

由于是用的spring-boot整合的client,跟蹤consumer源碼,代碼在DefaultRocketMQListenerContainer.handleMessage方法中

然而一切正常,再往上跟蹤到DefaultMessageListenerConcurrently

public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    handleMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

首先在catch代碼塊點打斷點看看是不是有問題,結果發(fā)現(xiàn)并沒有走到這里,這就坑爹了,害我又從其它方面各種查原因,浪費了很多時間。后面一步一步調(diào)試,最終在 log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); 打日志這一步時拋出了異常,這尼瑪打個日志還能異常,還不是Exception的異常。。本來松了口氣以為找到了原因就好解決了,沒想到這才是剛剛開始。

7.在往上層調(diào)用代碼ConsumeMessageConcurrentlyService里斷點查看異常信息

java.lang.NoClassDefFoundError:Could not initialize class org.apache.rocketmq.common.message.MessageClientIDSetter

如何解決RocketMQ消息消費異常

原因是在MessageClientExt類中調(diào)用getMsgId方法里,調(diào)用了MessageClientIDSetter.getUniqID(this)直接拋出的異常

如何解決RocketMQ消息消費異常

從異常信息來看是MessageClientIDSetter 在初始化的時候出了問題

8.查看MessageClientIDSetter原碼,有一斷靜態(tài)代碼塊,然后在這里斷點跟蹤。
static {
        byte[] ip;
        try {
            ip = UtilAll.getIP();
        } catch (Exception e) {
            ip = createFakeIP();
        }
        LEN = ip.length + 2 + 4 + 4 + 2;
        ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
        tempBuffer.position(0);
        tempBuffer.put(ip);
        tempBuffer.position(ip.length);
        tempBuffer.putInt(UtilAll.getPid());
        tempBuffer.position(ip.length + 2);
        tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
        FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
        setStartTime(System.currentTimeMillis());
        COUNTER = new AtomicInteger(0);
    }

發(fā)面是在ip = UtilAll.getIP();出了問題,然則并沒有到catch代碼塊,而是跳到了DefaultMqPushConsumerImpl類中,這里又一個坑爹的是異常塊沒有任何處理,看不到異常信息,好吧只能一步一步繼續(xù)斷點調(diào)試

如何解決RocketMQ消息消費異常

最終在UtillAll類的ipV6Check方法執(zhí)行到InetAddressValidator.getInstance();出了問題。

private static boolean ipV6Check(byte[] ip) {
        if (ip.length != 16) {
            throw new RuntimeException("illegal ipv6 bytes");
        }

        InetAddressValidator validator = InetAddressValidator.getInstance();
        return validator.isValidInet6Address(ipToIPv6Str(ip));
    }

但是在本地調(diào)試這段代碼又沒有任何問題。因此只能在debug時調(diào)試,報的錯是classNotFound異常

9.這又是坑爹的問題,源碼上看著類也能找到,最后查看maven依賴,才發(fā)現(xiàn)有jar包沖突。

如何解決RocketMQ消息消費異常

從rocketMq的依賴來看他需要的版本是1.6

如何解決RocketMQ消息消費異常

因此我們需要把1.3.1的版本移除

10.查找依賴1.3.1版本的commons-validator

看著這密密麻麻的依賴關系,Idea還沒有查找功能,只能慢慢找了,最后功夫不復有心人,把依賴移除,重啟一切都好了

如何解決RocketMQ消息消費異常

到此,關于“如何解決RocketMQ消息消費異?!钡膶W習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

當前標題:如何解決RocketMQ消息消費異常
轉載來于:http://muchs.cn/article30/ippopo.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站、網(wǎng)站建設、搜索引擎優(yōu)化、動態(tài)網(wǎng)站、電子商務定制網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站優(yōu)化排名