這篇文章給大家介紹如何理解RocketMQ消費位置,內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
創(chuàng)新互聯(lián)公司專注于臺江網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供臺江營銷型網(wǎng)站建設(shè),臺江網(wǎng)站制作、臺江網(wǎng)頁設(shè)計、臺江網(wǎng)站官網(wǎng)定制、微信小程序服務(wù),打造臺江網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供臺江網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。
RocketMQ創(chuàng)建消費者的時指定了Topic主題及Tag,我們發(fā)現(xiàn)新創(chuàng)建的消費者消費不了歷史的數(shù)據(jù),只能消費掉創(chuàng)建以后消費者發(fā)送的數(shù)據(jù)。這是什么原因,我們能把所有的消息都消費嗎?,我們可以指定需要消費的消息的時間嗎?答案是肯定的,下面我們具體分析一下。
前提:我們討論是集群模式下的,廣播模式也是一樣的,只是示例代碼我們用集群模式來討論。
消息消費的位置目前提供了三種方式CONSUME_FROM_LAST_OFFSET(隊列尾部消費)、CONSUME_FROM_FIRST_OFFSET(隊列頭部消費)、CONSUME_FROM_TIMESTAMP(指定消費時間點)。
public enum ConsumeFromWhere { CONSUME_FROM_LAST_OFFSET, @Deprecated CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST, @Deprecated CONSUME_FROM_MIN_OFFSET, @Deprecated CONSUME_FROM_MAX_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMP, }
分析源碼我們看到有6種方式,其他三種已經(jīng)廢棄掉了,不做討論。
我們從新創(chuàng)建一個消費組來消費某個主題下的消息時,歷史消息沒有被消費,當(dāng)生產(chǎn)者重新發(fā)送消息時則會接收到最新的,我們分析下其在哪設(shè)置的。
當(dāng)創(chuàng)建消費者的時候內(nèi)置了一些參數(shù),從隊列尾部消費。
從隊列尾部消費導(dǎo)致歷史消息消費不了,會丟失一部分數(shù)據(jù),如果僅僅是狀態(tài)數(shù)據(jù)則可以這樣設(shè)置,如果是業(yè)務(wù)數(shù)據(jù)導(dǎo)致數(shù)據(jù)丟失。
對于設(shè)置這個參數(shù)僅對于消費組第一次創(chuàng)建時生效,后面再次設(shè)置不生效,因為該消費組在服務(wù)端已經(jīng)記錄了消費的進度,已有進度位置。
查看消費進度文件的位置,我們根據(jù)上幾節(jié)的內(nèi)容查看TopicTest主題下的這個consumer_test_clustering消費組的消息消費的進度。查看Broker-a服務(wù)器節(jié)點上的信息。
查看消費的消費進度先根據(jù)可視化界面查看
查看服務(wù)器文件上的消費進度信息:/usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json
編寫Consumer
public class Consumer1 { public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_first_offset"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); Date date = new Date(msg.getStoreTimestamp()); System.out.println("Consumer1=== 存入時間 : "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內(nèi)容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功 } }); consumer.start(); System.out.println("Consumer1===啟動成功!"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
設(shè)置了消費組:consumer.setConsumerGroup("consumer_first_offset");
設(shè)置了消費位置:consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
查看其結(jié)果
從頭開始消費是指目前還儲存在broker的上的消息全部消費一遍,因為RocketMQ會將消息持久化到磁盤文件中,時間長就會導(dǎo)致磁盤文件會很多,RocketMQ有一種機制,只是保留一段時間的消息,之前的消息會刪除,可以指定時間點刪除(無論消息是否被消費,到時間點文件都會被刪除)
消費者代碼
public class Consumer1 { public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_time_offset"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("TopicTest", "*"); //可以設(shè)置從什么時間開始消費,配合setConsumeTimestamp一起使用默認半小時之前的,格式y(tǒng)yyyMMddhhmmss consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - 1800000L)); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); Date date = new Date(msg.getStoreTimestamp()); System.out.println("Consumer1=== 存入時間 : "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內(nèi)容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功 } }); consumer.start(); System.out.println("Consumer1===啟動成功!"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
設(shè)置消費位置:consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); 設(shè)置消費的時間點:consumer.setConsumeTimestamp("20181222171201");
如果從消息進度服務(wù)OffsetStore讀取到MessageQueue中的偏移量大于等于0,則使用讀取到的偏移量,只有讀取到的偏移量小于0時上述策略才會生效。
關(guān)于如何理解RocketMQ消費位置就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
當(dāng)前標題:如何理解RocketMQ消費位置
當(dāng)前路徑:http://muchs.cn/article22/jpdpjc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站制作、網(wǎng)站內(nèi)鏈、虛擬主機、企業(yè)網(wǎng)站制作、外貿(mào)建站、商城網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)