ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)

這篇文章將為大家詳細(xì)講解有關(guān)ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ),小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

創(chuàng)新互聯(lián)公司-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比平輿網(wǎng)站開(kāi)發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫(kù),直接使用。一站式平輿網(wǎng)站制作公司更省心,省錢(qián),快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋平輿地區(qū)。費(fèi)用合理售后完善,10余年實(shí)體公司更值得信賴。

一、隊(duì)列和topic

  • 概述
    ActiveMQ不僅支持persistent和non-persistent兩種方式,還支持消息的恢復(fù)(recovery)方式

  • PTP
    Queue的存儲(chǔ)方式很簡(jiǎn)單,就是一個(gè)FIFO(先進(jìn)先出)的Queue
    ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)

  • PUB/SUB
    對(duì)于持久化訂閱主題,每一個(gè)消費(fèi)者將獲得一個(gè)消息的復(fù)制
    ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)

  • 有效的消息存儲(chǔ)
      ActiveMQ提供了一個(gè)插件式的消息存儲(chǔ),類似于消息的多點(diǎn)傳播,主要實(shí)現(xiàn)了如下幾種:
    1:AMQ消息存儲(chǔ)一基于文件的存儲(chǔ)方式,是以前的默認(rèn)消息存儲(chǔ)
    2:KahaDB消息存儲(chǔ)一提供了容量的提升和恢復(fù)能力,是現(xiàn)在的默認(rèn)存儲(chǔ)方式
    3:JDBC消息存儲(chǔ)一消息基于JDBC存儲(chǔ)的
    4:Memory消息存儲(chǔ)一基于內(nèi)存的消息存儲(chǔ)

二、KahaDB

  • KahaDB Message Store概述
        KahaDB是目前默認(rèn)的存儲(chǔ)方式,可用于任何場(chǎng)景,提高了性能和恢復(fù)能力。消息存儲(chǔ)使用一個(gè)事務(wù)日志和僅僅用一個(gè)索引文件來(lái)存儲(chǔ)它所有的地址。
        KahaDB是一個(gè)專門(mén)針對(duì)消息持久化的解決方案,它對(duì)典型的消息使用模式進(jìn)行了優(yōu)化。在Kaha中,數(shù)據(jù)被追加到datalogs中。當(dāng)不再需要log文件中的數(shù)的時(shí)候,log文件會(huì)被丟棄。

  • KahaDB基本配置例子

    <persistenceAdapter>
    <kahaDB directory="${actlvemq.data}/kahadb"/>
    </persistenceAdapter>

    可用的屬性有:
    1:director:KahaDB存放的路徑,默認(rèn)值activemq-data
    2:indexWriteBatchSize:批量寫(xiě)入磁盤(pán)的索引page數(shù)量,默認(rèn)值1000
    3:indexCacheSize:內(nèi)存中緩存索引page的數(shù)量,默認(rèn)值10000
    4:enableIndexWriteAsync:是否異步寫(xiě)出索引,默認(rèn)false
    5:journalMaxFi1eLength:設(shè)置每個(gè)消息data log的大小,默認(rèn)是32M
    6:enab1eJournalDiskSyncs:設(shè)置是否保證每個(gè)沒(méi)有事務(wù)的內(nèi)容,被同步寫(xiě)入磁盤(pán),JMS持久化的時(shí)候需要,默認(rèn)為true
    7:cleanupInterval:在檢查到不再使用的消后,在具體刪除消息前的時(shí)間,默認(rèn)30000
    8:checkpointInterval:checkpoint的間隔時(shí)間,默認(rèn)5000
    9:ignoreMissingJournalfiles:是否忽略丟失的消息日志文件,默認(rèn)false
    10:checkForCourruptJournalFiles:在啟動(dòng)的時(shí)候,將會(huì)驗(yàn)證消息文件是否損壞,默認(rèn)為false
    11:checksumJournalFiles:是否為每個(gè)消息日志文件提供checksum,默認(rèn)false
    12:archiveDataLogs:是否移動(dòng)文件到特定的路徑,而不是刪除它們,默認(rèn)false
    13:directoryArchive:定義消息已經(jīng)被消費(fèi)后,移動(dòng)data log到的路徑,默認(rèn)為null
    14:databaseLockedWaitDelay:獲得數(shù)據(jù)庫(kù)鎖的等待時(shí)間,默認(rèn)10000
    15:maxAsyncJobs:設(shè)置最大的可以存儲(chǔ)的異步消息隊(duì)列,默認(rèn)10000,可以和concurrent MessageProducers設(shè)置成一樣的值
    16:concurrentStoreAndDispatchTransactions:是否分發(fā)消息到客戶端,同時(shí)事務(wù)存儲(chǔ)消息,默認(rèn)true
    17:concurrentStoreAndDispatchTopics:是否分發(fā)Topic消息到客戶端,同時(shí)進(jìn)行存儲(chǔ),默認(rèn)true
    18:concurrentStoreAndDispatchQueues:是否分發(fā)queue消息到客戶端,同時(shí)進(jìn)行存儲(chǔ),默認(rèn)true

  • 在Java中內(nèi)嵌使用Broker,使用KahaDB的例子

    public class EmbeddedBrokerUsingKahaDBStoreExample {
    public BrokerService createEmbeddedBroker()throws Exception{
        BrokerService broker = new BrokerService();
        File dataFileDir = new File("target/amq-in-action/kahadb");
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(dataFileDir);
        kaha.setJournalMaxFileLength(1024*1000);
        kaha.setIndexWriteBatchSize(100);
        kaha.setEnableIndexWriteAsync(true);
    
        broker.setPersistenceAdapter(kaha);
        broker.addConnector("tcp://localhost:61616");
        broker.start();
        return broker;
    }
    }

三、AMQ

  • AMQ Message Store概述
        AMQ Message Store 是ActiveMQ5.0缺省的持久化存儲(chǔ),它是一個(gè)基于文件、事務(wù)存儲(chǔ)設(shè)計(jì)為快速消息存儲(chǔ)的一個(gè)結(jié)構(gòu),該結(jié)構(gòu)是以流的形式來(lái)進(jìn)行消息交互的。
        這種方式中,Messages被保存到data logs中,同時(shí)被reference store進(jìn)行索引以提高存取速度。Data logs由一些單獨(dú)的data log文件組成,缺省的文件大小是32M,如果某個(gè)消息的大小超過(guò)了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某個(gè)data log文件中所有的消息都被成功消費(fèi)了,那么這個(gè)data log文件將會(huì)被標(biāo)記,以便在下一輪的清理中被刪除或者歸檔。

  • AMQ Message Store配置示例

    <broker brokerName="broker" persistent="true" useShutdownHook="false">
    <persistenceAdapter>
        <amqPersistenceAdapter driectory="${activemq.base}/data"    maxFileLength="32mb"/>
    </persistenceAdapter>
    </broker>

    四、 JDBC

  • 使用JDBC來(lái)持久化消息(此步驟不需要手工跑腳本)
    ActiveMQ支持使用JDBC來(lái)持久化消息,預(yù)定義的表如下:
    1:消息表,缺省表明為ACTIVEMQ_MSGS,quue和topic都存在里面,結(jié)構(gòu)如下:
    ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)
    2:ACTIVEMQ_ACKS表存儲(chǔ)持久訂閱的信息和最后一個(gè)持久訂閱接收的消息ID,結(jié)構(gòu)如下:
    ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)
    3:鎖定表,缺省表明為ACTIVEMQ_LOCK,用來(lái)確保在某一時(shí)刻,只能有一個(gè)Act broker實(shí)例來(lái)訪問(wèn)數(shù)據(jù)庫(kù),結(jié)構(gòu)如下:
    ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)

  • 使用JDBC來(lái)持久化消息的配置示例
    ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)

注意:
(1)數(shù)據(jù)庫(kù)需要字符集設(shè)置為latin1。
(2)需要把MySQL-connector-java.jar包放入lib中。
(3)啟動(dòng)成功之后會(huì)出現(xiàn)三張表。

  • 示例
    一、(queue模式):
    ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)
    ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)
    運(yùn)行發(fā)送者(數(shù)據(jù)庫(kù)中有三條未接收的消息):
    ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)
    運(yùn)行接收者(消息成功接收的同時(shí),數(shù)據(jù)庫(kù)中的消息也會(huì)被刪除)

二、persistence模式
消息接收者接收完成之后,數(shù)據(jù)庫(kù)中的消息不會(huì)被刪除。
如圖所示:
ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)

  • JDBC Message Store with ActiveMQ Journal(日志)
    這種方式克服了JDBC Store的不足,使用快速的緩存寫(xiě)入技術(shù),大大提高了性能。配置示例如下:

    <beans>
    <broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
        <persistenceFactory>
            <journalPersistenceAdapterFactory 
                journalLogFiles="4"
                journalLogFileSize="32768"
                useJournal="true"
                useQuickJournal="true"
                dataSource="#derby-ds"
                dataDirectory="activemq-data"/>         
        </persistenceFactory>
    </broker>
    </beans>
  • JDBC Store和JDBC Message Store with ActiveMQ Journal的區(qū)別
    1:jdbc with journal的性能優(yōu)于jdbc
    2:jdbc用于master/slave模式的數(shù)據(jù)庫(kù)分享
    3:jdbc with journal不能用于master/slave模式
    4:一般情況下(非集群狀態(tài)下),推薦使用jdbc with journal

五、 MMS

  • Memory Message Store
        內(nèi)存消息存儲(chǔ)主要是存儲(chǔ)所有的持久化的消息在內(nèi)存中。這里沒(méi)有動(dòng)態(tài)的緩存存在,所以必須要注意設(shè)置broker所在的JVM和內(nèi)存限制。

  • Memory Message Store 配置示例

    <beans>
    <broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors uri="tcp://localhost:61616"/>
        </transportConnectors>
    </broker>
    </beans>
  • 在Java中內(nèi)嵌使用Broker,使用Memory的例子

    public void createEmbeddedBroker()throws Exception{
    BrokerService broker = new BrokerService();
    broker.setPersistent(false);
    broker.addConnector("tcp://localhost:61616");
    broker.start();
    }

關(guān)于“ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

分享名稱:ActiveMQ中如何實(shí)現(xiàn)消息存儲(chǔ)
URL網(wǎng)址:http://muchs.cn/article42/pjjchc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營(yíng)銷(xiāo)、定制開(kāi)發(fā)關(guān)鍵詞優(yōu)化、響應(yīng)式網(wǎng)站、品牌網(wǎng)站制作動(dòng)態(tài)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)