RocketMQ事務(wù)消息實(shí)現(xiàn)基本思想是怎樣的

這篇文章將為大家詳細(xì)講解有關(guān)RocketMQ事務(wù)消息實(shí)現(xiàn)基本思想是怎樣的,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

為徽州等地區(qū)用戶提供了全套網(wǎng)頁(yè)設(shè)計(jì)制作服務(wù),及徽州網(wǎng)站建設(shè)行業(yè)解決方案。主營(yíng)業(yè)務(wù)為成都做網(wǎng)站、成都網(wǎng)站制作、成都外貿(mào)網(wǎng)站建設(shè)、徽州網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!

RocketMQ事務(wù)消息實(shí)現(xiàn)基本思想。

RocketMQ4.3.0版本開(kāi)始支持事務(wù)消息,后續(xù)分享將開(kāi)始將剖析事務(wù)消息的實(shí)現(xiàn)原理。首先從官方給出的Demo實(shí)例入手,以此通往RocketMQ事務(wù)消息的世界中。

官方版本未發(fā)布之前,從apache rocketmq第一個(gè)版本上線后,代碼中存在與事務(wù)消息相關(guān)的代碼,例如COMMIT、ROLLBACK、PREPARED,在事務(wù)消息未開(kāi)源之前網(wǎng)上對(duì)于事務(wù)消息的“聲音”基本上是使用類似二階段提交,主要是根據(jù)消息系統(tǒng)標(biāo)志MessageSysFlag中定義來(lái)推測(cè)的:

  • TRANSACTION_PREPARED_TYPE

  • TRANSACTION_COMMIT_TYPE

  • TRANSACTION_ROLLBACK_TYPE

消息發(fā)送者首先發(fā)送TRANSACTION_PREPARED_TYPE類型的消息,然后根據(jù)事務(wù)狀態(tài)來(lái)決定是提交或回滾事務(wù)發(fā)送commit請(qǐng)求或rollback請(qǐng)求,如果commit/rollback請(qǐng)求丟失后,rocketmq會(huì)在指定超時(shí)時(shí)間后回查事務(wù)狀態(tài)來(lái)決定提交或回滾事務(wù)。

讓我們各自帶著自己的理解和猜測(cè),從閱讀RocketMQ官方提供的Demo程序入手,試圖窺探一些大體的信息。

Demo示例程序位于:/rocketmq-example/src/main/java/org/apache/rocketmq/example/transaction包中。該包中未放置消息消費(fèi)者,為了驗(yàn)證事務(wù)的消息消費(fèi)情況,我們可以從其他包c(diǎn)opy一個(gè)消費(fèi)者,從而先運(yùn)行生產(chǎn)者,然后運(yùn)行消費(fèi)者,判斷事務(wù)消息的預(yù)發(fā)放、提交、回滾等效果,二話不說(shuō),先運(yùn)行一下,看下效果再說(shuō):
消息發(fā)送端運(yùn)行結(jié)果:

SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5767EC0000, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57680F0001, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57681E0002, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57682B0003, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768380004, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768490005, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768560006, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768640007, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768730008, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768800009, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=9]

消息消費(fèi)端效果:

Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715812, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749010, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001DE8, commitLogOffset=7656, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=5477, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY7, TRAN_MSG=true, CONSUME_START_TIME=1532746024360, UNIQ_KEY=C0A8010518DC6D06D69C8D5768640007, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagC, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='C0A8010518DC6D06D69C8D5768640007'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=1, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715768, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749008, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001B91, commitLogOffset=7057, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=4496, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY4, TRAN_MSG=true, CONSUME_START_TIME=1532746024361, UNIQ_KEY=C0A8010518DC6D06D69C8D5768380004, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagE, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='C0A8010518DC6D06D69C8D5768380004'}]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715727, bornHost=/192.168.1.5:55482, storeTimestamp=1532745748834, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F000000000000193A, commitLogOffset=6458, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=3515, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY1, TRAN_MSG=true, CONSUME_START_TIME=1532746024368, UNIQ_KEY=C0A8010518DC6D06D69C8D57680F0001, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagB, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='C0A8010518DC6D06D69C8D57680F0001'}]]

綜上所述,服務(wù)端發(fā)送了10條消息,而消費(fèi)端只收到3條消息,應(yīng)該是由于事務(wù)回滾,造成只提交了3條消息,為了更加嚴(yán)謹(jǐn),可以安裝一個(gè)rocketmq-consonse,更加直觀的觀察shangshagn's上述結(jié)果:
RocketMQ事務(wù)消息實(shí)現(xiàn)基本思想是怎樣的cdn.com/97db9b9f7bc18478739f472e134cf48eb3af8cad.png">

接下來(lái)對(duì)示例代碼進(jìn)行解讀:

1、生產(chǎn)者端代碼解讀:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();        // @1
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });      // @2
        producer.setExecutorService(executorService);                                // @3
        producer.setTransactionListener(transactionListener);                      // @4
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {                                                                    // @5
            try {
                Message msg =
                    new Message("transaction_topic_test", tags[i % tags.length], "KEY">

代碼@1:創(chuàng)建TransactionListener 實(shí)例,字面理解為事務(wù)消息事件監(jiān)聽(tīng)器,下文詳細(xì)對(duì)其進(jìn)行展開(kāi)。
代碼@2:ExecutorService executorService,創(chuàng)建一個(gè)線程池,其線程的名稱前綴”client-transaction-msg-check-thread“,從字面理解為客戶端事務(wù)消息狀態(tài)檢測(cè)線程,我們可以大膽的猜測(cè)一下是不是這個(gè)線程池調(diào)用TransactionListener方法,完成對(duì)事務(wù)消息的檢測(cè)呢?【這里只是作者的猜測(cè),大家不能當(dāng)真,在作者后續(xù)文章發(fā)布后,如果該觀點(diǎn)錯(cuò)誤,會(huì)加以修復(fù),這里寫(xiě)出來(lái),主要是想分享一下我讀源碼的方法】。
代碼@3:為事務(wù)消息發(fā)送者設(shè)置線程池。
代碼@4:為事務(wù)消息發(fā)送者設(shè)置事務(wù)監(jiān)聽(tīng)器。
代碼@5:發(fā)送10條消息。

2、TransactionListener代碼解讀

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
  1. executeLocalTransaction方法:記錄本地事務(wù)的事務(wù)狀態(tài),這里其實(shí)現(xiàn)就是循環(huán)設(shè)置事務(wù)消息的狀態(tài)為0,1,2,demo中是把消息的狀態(tài)數(shù)據(jù)存放在一個(gè)Map中。實(shí)際應(yīng)用時(shí)通常會(huì)持久化消息的事務(wù)狀態(tài),例如數(shù)據(jù)庫(kù)或緩存。

  2. checkLocalTransaction方法,事務(wù)回查業(yè)務(wù)實(shí)現(xiàn),查本地事務(wù)表,判斷事務(wù)的狀態(tài)如為0:UNKNOW,1:COMMIT_MESSAGE;ROLLBACK_MESSAGE。這里就能解釋,生產(chǎn)者連續(xù)發(fā)10條消息,因?yàn)橹挥?條消息的事務(wù)狀態(tài)為COMMIT_MESSAGE,故消息消費(fèi)者只能消費(fèi)3條。

到這里,基本上還是可以得知事務(wù)消息的實(shí)現(xiàn)方式,基本與文章開(kāi)頭所示的“網(wǎng)上聲音”實(shí)現(xiàn)類似,下一節(jié)將詳細(xì)分析TransactionMQProducer事務(wù)消息發(fā)送的實(shí)現(xiàn)細(xì)節(jié)。

鄭重聲明:本文主要是展示事務(wù)消息的基本使用,本文所下的結(jié)論還僅僅是作者的猜測(cè),本文一個(gè)非常重要的目的,是向讀者朋友們展示作者學(xué)習(xí)源碼的一個(gè)方法,總結(jié)為:先做全面了解(網(wǎng)上,官方文檔)、然后加以自己的思考,從Demo實(shí)例入手學(xué)習(xí),將學(xué)習(xí)任務(wù)分解之,邊寫(xiě)邊看。

關(guān)于RocketMQ事務(wù)消息實(shí)現(xiàn)基本思想是怎樣的就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

分享題目:RocketMQ事務(wù)消息實(shí)現(xiàn)基本思想是怎樣的
當(dāng)前鏈接:http://muchs.cn/article36/pdhdpg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供關(guān)鍵詞優(yōu)化App開(kāi)發(fā)、域名注冊(cè)、電子商務(wù)、企業(yè)建站、外貿(mào)建站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作