RocketMQ消息軌跡是怎樣的

這篇文章主要講解了“RocketMQ消息軌跡是怎樣的”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“RocketMQ消息軌跡是怎樣的”吧!

網(wǎng)站設(shè)計(jì)制作過(guò)程拒絕使用模板建站;使用PHP+MYSQL原生開發(fā)可交付網(wǎng)站源代碼;符合網(wǎng)站優(yōu)化排名的后臺(tái)管理系統(tǒng);成都網(wǎng)站建設(shè)、做網(wǎng)站收費(fèi)合理;免費(fèi)進(jìn)行網(wǎng)站備案等企業(yè)網(wǎng)站建設(shè)一條龍服務(wù).我們是一家持續(xù)穩(wěn)定運(yùn)營(yíng)了10年的創(chuàng)新互聯(lián)建站網(wǎng)站建設(shè)公司。

1、發(fā)送消息軌跡流程

首先我們來(lái)看一下在消息發(fā)送端如何啟用消息軌跡,示例代碼如下:

public class TraceProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);      // [@1](https://my.oschina.net/u/1198)
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}

從上述代碼可以看出其關(guān)鍵點(diǎn)是在創(chuàng)建DefaultMQProducer時(shí)指定開啟消息軌跡跟蹤。我們不妨瀏覽一下DefaultMQProducer與啟用消息軌跡相關(guān)的構(gòu)造函數(shù):

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)

參數(shù)如下:

  • String producerGroup 生產(chǎn)者所屬組名。

  • boolean enableMsgTrace 是否開啟跟蹤消息軌跡,默認(rèn)為false。

  • String customizedTraceTopic 如果開啟消息軌跡跟蹤,用來(lái)存儲(chǔ)消息軌跡數(shù)據(jù)所屬的主題名稱,默認(rèn)為:RMQ_SYS_TRACE_TOPIC。

1.1 DefaultMQProducer構(gòu)造函數(shù)

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {      // [@1](https://my.oschina.net/u/1198)
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    //if client open the message trace feature
    if (enableMsgTrace) {                                                                                                                                                                                            // @2
        try {
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);                                                         
            dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
            traceDispatcher = dispatcher;
            this.getDefaultMQProducerImpl().registerSendMessageHook(
                new SendMessageTraceHookImpl(traceDispatcher));                                                                                                                             // [@3](https://my.oschina.net/u/2648711)
        } catch (Throwable e) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

代碼@1:首先介紹一下其局部變量。

  • String producerGroup 生產(chǎn)者所屬組。

  • RPCHook rpcHook 生產(chǎn)者發(fā)送鉤子函數(shù)。

  • boolean enableMsgTrace 是否開啟消息軌跡跟蹤。

  • String customizedTraceTopic 定制用于存儲(chǔ)消息軌跡的數(shù)據(jù)。

代碼@2:用來(lái)構(gòu)建AsyncTraceDispatcher,看其名:異步轉(zhuǎn)發(fā)消息軌跡數(shù)據(jù),稍后重點(diǎn)關(guān)注。

代碼@3:構(gòu)建SendMessageTraceHookImpl對(duì)象,并使用AsyncTraceDispatcher用來(lái)異步轉(zhuǎn)發(fā)。

1.2 SendMessageTraceHookImpl鉤子函數(shù)

1.2.1 SendMessageTraceHookImpl類圖

RocketMQ消息軌跡是怎樣的

  1. SendMessageHook 消息發(fā)送鉤子函數(shù),用于在消息發(fā)送之前、發(fā)送之后執(zhí)行一定的業(yè)務(wù)邏輯,是記錄消息軌跡的最佳擴(kuò)展點(diǎn)。

  2. TraceDispatcher 消息軌跡轉(zhuǎn)發(fā)處理器,其默認(rèn)實(shí)現(xiàn)類AsyncTraceDispatcher,異步實(shí)現(xiàn)消息軌跡數(shù)據(jù)的發(fā)送。下面對(duì)其屬性做一個(gè)簡(jiǎn)單的介紹:

    • int queueSize 異步轉(zhuǎn)發(fā),隊(duì)列長(zhǎng)度,默認(rèn)為2048,當(dāng)前版本不能修改。

    • int batchSize 批量消息條數(shù),消息軌跡一次消息發(fā)送請(qǐng)求包含的數(shù)據(jù)條數(shù),默認(rèn)為100,當(dāng)前版本不能修改。

    • int maxMsgSize 消息軌跡一次發(fā)送的最大消息大小,默認(rèn)為128K,當(dāng)前版本不能修改。

    • DefaultMQProducer traceProducer 用來(lái)發(fā)送消息軌跡的消息發(fā)送者。

    • ThreadPoolExecutor traceExecuter 線程池,用來(lái)異步執(zhí)行消息發(fā)送。

    • AtomicLong discardCount 記錄丟棄的消息個(gè)數(shù)。

    • Thread worker woker線程,主要負(fù)責(zé)從追加隊(duì)列中獲取一批待發(fā)送的消息軌跡數(shù)據(jù),提交到線程池中執(zhí)行。

    • ArrayBlockingQueue< TraceContext> traceContextQueue 消息軌跡TraceContext隊(duì)列,用來(lái)存放待發(fā)送到服務(wù)端的消息。

    • ArrayBlockingQueue< Runnable> appenderQueue 線程池內(nèi)部隊(duì)列,默認(rèn)長(zhǎng)度1024。

    • DefaultMQPushConsumerImpl hostConsumer 消費(fèi)者信息,記錄消息消費(fèi)時(shí)的軌跡信息。

    • String traceTopicName 用于跟蹤消息軌跡的topic名稱。

1.2.2 源碼分析SendMessageTraceHookImpl
1.2.2.1 sendMessageBefore
public void sendMessageBefore(SendMessageContext context) { 
    //if it is message trace data,then it doesn't recorded
    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {   // @1
        return;
    }
    //build the context content of TuxeTraceContext
    TraceContext tuxeContext = new TraceContext();
    tuxeContext.setTraceBeans(new ArrayList<tracebean>(1));
    context.setMqTraceContext(tuxeContext);
    tuxeContext.setTraceType(TraceType.Pub);
    tuxeContext.setGroupName(context.getProducerGroup());                                                                                                                       // @2
    //build the data bean object of message trace
    TraceBean traceBean = new TraceBean();                                                                                                                                                // @3
    traceBean.setTopic(context.getMessage().getTopic());
    traceBean.setTags(context.getMessage().getTags());
    traceBean.setKeys(context.getMessage().getKeys());
    traceBean.setStoreHost(context.getBrokerAddr());
    traceBean.setBodyLength(context.getMessage().getBody().length);
    traceBean.setMsgType(context.getMsgType());
    tuxeContext.getTraceBeans().add(traceBean);
}

代碼@1:如果topic主題為消息軌跡的Topic,直接返回。

代碼@2:在消息發(fā)送上下文中,設(shè)置用來(lái)跟蹤消息軌跡的上下環(huán)境,里面主要包含一個(gè)TraceBean集合、追蹤類型(TraceType.Pub)與生產(chǎn)者所屬的組。

代碼@3:構(gòu)建一條跟蹤消息,用TraceBean來(lái)表示,記錄原消息的topic、tags、keys、發(fā)送到broker地址、消息體長(zhǎng)度等消息。

從上文看出,sendMessageBefore主要的用途就是在消息發(fā)送的時(shí)候,先準(zhǔn)備一部分消息跟蹤日志,存儲(chǔ)在發(fā)送上下文環(huán)境中,此時(shí)并不會(huì)發(fā)送消息軌跡數(shù)據(jù)。

1.2.2.2 sendMessageAfter
public void sendMessageAfter(SendMessageContext context) {
    //if it is message trace data,then it doesn't recorded
    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())     // @1
        || context.getMqTraceContext() == null) {
        return;
    }
    if (context.getSendResult() == null) {
        return;
    }

    if (context.getSendResult().getRegionId() == null
        || !context.getSendResult().isTraceOn()) {
        // if switch is false,skip it
        return;
    }

    TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
    TraceBean traceBean = tuxeContext.getTraceBeans().get(0);                                                                                                // @2
    int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());     // @3
    tuxeContext.setCostTime(costTime);                                                                                                                                      // @4
    if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {                                                                    
        tuxeContext.setSuccess(true);
    } else {
        tuxeContext.setSuccess(false);
    }
    tuxeContext.setRegionId(context.getSendResult().getRegionId());
  traceBean.setMsgId(context.getSendResult().getMsgId());
    traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
    traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
    localDispatcher.append(tuxeContext);                                                                                                                                   // @5
}

代碼@1:如果topic主題為消息軌跡的Topic,直接返回。

代碼@2:從MqTraceContext中獲取跟蹤的TraceBean,雖然設(shè)計(jì)成List結(jié)構(gòu)體,但在消息發(fā)送場(chǎng)景,這里的數(shù)據(jù)永遠(yuǎn)只有一條,及時(shí)是批量發(fā)送也不例外。

代碼@3:獲取消息發(fā)送到收到響應(yīng)結(jié)果的耗時(shí)。

代碼@4:設(shè)置costTime(耗時(shí))、success(是否發(fā)送成功)、regionId(發(fā)送到broker所在的分區(qū))、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,則是最后一條消息的物理偏移量)、storeTime,這里使用的是(客戶端發(fā)送時(shí)間 + 二分之一的耗時(shí))來(lái)表示消息的存儲(chǔ)時(shí)間,這里是一個(gè)估值。

代碼@5:將需要跟蹤的信息通過(guò)TraceDispatcher轉(zhuǎn)發(fā)到Broker服務(wù)器。其代碼如下:

public boolean append(final Object ctx) {
    boolean result = traceContextQueue.offer((TraceContext) ctx);
    if (!result) {
        log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
    }
    return result;
}

這里一個(gè)非常關(guān)鍵的點(diǎn)是offer方法的使用,當(dāng)隊(duì)列無(wú)法容納新的元素時(shí)會(huì)立即返回false,并不會(huì)阻塞。

接下來(lái)將目光轉(zhuǎn)向TraceDispatcher的實(shí)現(xiàn)。

1.3 TraceDispatcher實(shí)現(xiàn)原理

TraceDispatcher,用于客戶端消息軌跡數(shù)據(jù)轉(zhuǎn)發(fā)到Broker,其默認(rèn)實(shí)現(xiàn)類:AsyncTraceDispatcher。

1.3.1 TraceDispatcher構(gòu)造函數(shù)
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {    
    // queueSize is greater than or equal to the n power of 2 of value
    this.queueSize = 2048;
    this.batchSize = 100;
    this.maxMsgSize = 128000;                                        
    this.discardCount = new AtomicLong(0L);         
    this.traceContextQueue = new ArrayBlockingQueue<tracecontext>(1024);
    this.appenderQueue = new ArrayBlockingQueue<runnable>(queueSize);
    if (!UtilAll.isBlank(traceTopicName)) {
        this.traceTopicName = traceTopicName;
    } else {
        this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
    }                   // @1
    this.traceExecuter = new ThreadPoolExecutor(// :
        10, //
        20, //
        1000 * 60, //
        TimeUnit.MILLISECONDS, //
        this.appenderQueue, //
        new ThreadFactoryImpl("MQTraceSendThread_"));
    traceProducer = getAndCreateTraceProducer(rpcHook);      // @2
}

代碼@1:初始化核心屬性,該版本這些值都是“固化”的,用戶無(wú)法修改。

  • queueSize 隊(duì)列長(zhǎng)度,默認(rèn)為2048,異步線程池能夠積壓的消息軌跡數(shù)量。

  • batchSize 一次向Broker批量發(fā)送的消息條數(shù),默認(rèn)為100.

  • maxMsgSize 向Broker匯報(bào)消息軌跡時(shí),消息體的總大小不能超過(guò)該值,默認(rèn)為128k。

  • discardCount 整個(gè)運(yùn)行過(guò)程中,丟棄的消息軌跡數(shù)據(jù),這里要說(shuō)明一點(diǎn)的是,如果消息TPS發(fā)送過(guò)大,異步轉(zhuǎn)發(fā)線程處理不過(guò)來(lái)時(shí),會(huì)主動(dòng)丟棄消息軌跡數(shù)據(jù)。

  • traceContextQueue traceContext積壓隊(duì)列,客戶端(消息發(fā)送、消息消費(fèi)者)在收到處理結(jié)果后,將消息軌跡提交到噶隊(duì)列中,則會(huì)立即返回。

  • appenderQueue 提交到Broker線程池中隊(duì)列。

  • traceTopicName 用于接收消息軌跡的Topic,默認(rèn)為RMQ_SYS_TRANS_HALF_TOPIC。

  • traceExecuter 用于發(fā)送到Broker服務(wù)的異步線程池,核心線程數(shù)默認(rèn)為10,最大線程池為20,隊(duì)列堆積長(zhǎng)度2048,線程名稱:MQTraceSendThread_。、

  • traceProducer 發(fā)送消息軌跡的Producer。

代碼@2:調(diào)用getAndCreateTraceProducer方法創(chuàng)建用于發(fā)送消息軌跡的Producer(消息發(fā)送者),下面詳細(xì)介紹一下其實(shí)現(xiàn)。

1.3.2 getAndCreateTraceProducer詳解
private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
        DefaultMQProducer traceProducerInstance = this.traceProducer;
        if (traceProducerInstance == null) {  //@1
            traceProducerInstance = new DefaultMQProducer(rpcHook);
            traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
            traceProducerInstance.setSendMsgTimeout(5000);
            traceProducerInstance.setVipChannelEnabled(false);
            // The max size of message is 128K
            traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
        }
        return traceProducerInstance;
    }

代碼@1:如果還未建立發(fā)送者,則創(chuàng)建用于發(fā)送消息軌跡的消息發(fā)送者,其GroupName為:_INNER_TRACE_PRODUCER,消息發(fā)送超時(shí)時(shí)間5s,最大允許發(fā)送消息大小118K。

1.3.3 start
public void start(String nameSrvAddr) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {     // @1
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);   // @2
    this.worker.setDaemon(true);
    this.worker.start();                                                                                   
    this.registerShutDownHook();
}

開始啟動(dòng),其調(diào)用的時(shí)機(jī)為啟動(dòng)DefaultMQProducer時(shí),如果啟用跟蹤消息軌跡,則調(diào)用之。

代碼@1:如果用于發(fā)送消息軌跡的發(fā)送者沒(méi)有啟動(dòng),則設(shè)置nameserver地址,并啟動(dòng)著。

代碼@2:?jiǎn)?dòng)一個(gè)線程,用于執(zhí)行AsyncRunnable任務(wù),接下來(lái)將重點(diǎn)介紹。

1.3.4 AsyncRunnable
class AsyncRunnable implements Runnable {
         private boolean stopped;
	public void run() {
        while (!stopped) {
            List<tracecontext> contexts = new ArrayList<tracecontext>(batchSize);     // @1
            for (int i = 0; i &lt; batchSize; i++) {
                TraceContext context = null;
                try {
                    //get trace data element from blocking Queue — traceContextQueue
                    context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);        // @2
                } catch (InterruptedException e) {
                }
                if (context != null) {
                    contexts.add(context);
                } else {
                    break;
                }
            }
            if (contexts.size() &gt; 0) {                                                                               :
                AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);  // @3
                traceExecuter.submit(request);                                                               
            } else if (AsyncTraceDispatcher.this.stopped) {
                this.stopped = true;
            }
        }
    }
}

代碼@1:構(gòu)建待提交消息跟蹤Bean,每次最多發(fā)送batchSize,默認(rèn)為100條。

代碼@2:從traceContextQueue中取出一個(gè)待提交的TraceContext,設(shè)置超時(shí)時(shí)間為5s,即如何該隊(duì)列中沒(méi)有待提交的TraceContext,則最多等待5s。

代碼@3:向線程池中提交任務(wù)AsyncAppenderRequest。

1.3.5 AsyncAppenderRequest#sendTraceData
public void sendTraceData(List<tracecontext> contextList) {
    Map<string, list<tracetransferbean>&gt; transBeanMap = new HashMap<string, list<tracetransferbean>&gt;();
    for (TraceContext context : contextList) {        //@1
        if (context.getTraceBeans().isEmpty()) {
            continue;
        }
        // Topic value corresponding to original message entity content
        String topic = context.getTraceBeans().get(0).getTopic();     // @2
        // Use  original message entity's topic as key
        String key = topic;
        List<tracetransferbean> transBeanList = transBeanMap.get(key);
        if (transBeanList == null) {
            transBeanList = new ArrayList<tracetransferbean>();
            transBeanMap.put(key, transBeanList);
        }
        TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);    // @3
        transBeanList.add(traceData);
    }
    for (Map.Entry<string, list<tracetransferbean>&gt; entry : transBeanMap.entrySet()) {       // @4
        flushData(entry.getValue());
    }
}

代碼@1:遍歷收集的消息軌跡數(shù)據(jù)。

代碼@2:獲取存儲(chǔ)消息軌跡的Topic。

代碼@3:對(duì)TraceContext進(jìn)行編碼,這里是消息軌跡的傳輸數(shù)據(jù),稍后對(duì)其詳細(xì)看一下,了解其上傳的格式。

代碼@4:將編碼后的數(shù)據(jù)發(fā)送到Broker服務(wù)器。

1.3.6 TraceDataEncoder#encoderFromContextBean

根據(jù)消息軌跡跟蹤類型,其格式會(huì)有一些不一樣,下面分別來(lái)介紹其合適。

1.3.6.1 PUB(消息發(fā)送)
case Pub: {
    TraceBean bean = ctx.getTraceBeans().get(0);
    //append the content of context and traceBean to transferBean's TransData
    sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
     .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
}

消息軌跡數(shù)據(jù)的協(xié)議使用字符串拼接,字段的分隔符號(hào)為1,整個(gè)數(shù)據(jù)以2結(jié)尾,感覺這個(gè)設(shè)計(jì)還是有點(diǎn)“不可思議”,為什么不直接使用json協(xié)議呢?

1.3.6.2 SubBefore(消息消費(fèi)之前)
for (TraceBean bean : ctx.getTraceBeans()) {
    sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
    }
}

軌跡就是按照上述順序拼接而成,各個(gè)字段使用1分隔,每一條記錄使用2結(jié)尾。

1.3.2.3 SubAfter(消息消費(fèi)后)
case SubAfter: {
    for (TraceBean bean : ctx.getTraceBeans()) {
        sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
        }
    }
}

格式編碼一樣,就不重復(fù)多說(shuō)。

經(jīng)過(guò)上面的源碼跟蹤,消息發(fā)送端的消息軌跡跟蹤流程、消息軌跡數(shù)據(jù)編碼協(xié)議就清晰了,接下來(lái)我們使用一張序列圖來(lái)結(jié)束本部分的講解。

RocketMQ消息軌跡是怎樣的

其實(shí)行文至此,只關(guān)注了消息發(fā)送的消息軌跡跟蹤,消息消費(fèi)的軌跡跟蹤又是如何呢?其實(shí)現(xiàn)原理其實(shí)是一樣的,就是在消息消費(fèi)前后執(zhí)行特定的鉤子函數(shù),其實(shí)現(xiàn)類為ConsumeMessageTraceHookImpl,由于其實(shí)現(xiàn)與消息發(fā)送的思路類似,故就不詳細(xì)介紹了。

2、 消息軌跡數(shù)據(jù)如何存儲(chǔ)

其實(shí)從上面的分析,我們已經(jīng)得知,RocketMQ的消息軌跡數(shù)據(jù)存儲(chǔ)在到Broker上,那消息軌跡的主題名如何指定?其路由信息又怎么分配才好呢?是每臺(tái)Broker上都創(chuàng)建還是只在其中某臺(tái)上創(chuàng)建呢?RocketMQ支持系統(tǒng)默認(rèn)與自定義消息軌跡的主題。

2.1 使用系統(tǒng)默認(rèn)的主題名稱

RocketMQ默認(rèn)的消息軌跡主題為:RMQ_SYS_TRACE_TOPIC,那該Topic需要手工創(chuàng)建嗎?其路由信息呢?

{
    if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {    // @1
        String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
        TopicConfig topicConfig = new TopicConfig(topic);
        this.systemTopicList.add(topic);
        topicConfig.setReadQueueNums(1);                                              // @2
        topicConfig.setWriteQueueNums(1);
        this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
    }
}

上述代碼出自TopicConfigManager的構(gòu)造函數(shù),在Broker啟動(dòng)的時(shí)候會(huì)創(chuàng)建topicConfigManager對(duì)象,用來(lái)管理topic的路由信息。

代碼@1:如果Broker開啟了消息軌跡跟蹤(traceTopicEnable=true)時(shí),會(huì)自動(dòng)創(chuàng)建默認(rèn)消息軌跡的topic路由信息,注意其讀寫隊(duì)列數(shù)為1。

2.2 用戶自定義消息軌跡主題

在創(chuàng)建消息發(fā)送者、消息消費(fèi)者時(shí),可以顯示的指定消息軌跡的Topic,例如:

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic)

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)

通過(guò)customizedTraceTopic來(lái)指定消息軌跡Topic。

溫馨提示:通常在生產(chǎn)環(huán)境上,將不會(huì)開啟自動(dòng)創(chuàng)建主題,故需要RocketMQ運(yùn)維管理人員提前創(chuàng)建好Topic。

感謝各位的閱讀,以上就是“RocketMQ消息軌跡是怎樣的”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)RocketMQ消息軌跡是怎樣的這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

網(wǎng)站標(biāo)題:RocketMQ消息軌跡是怎樣的
當(dāng)前URL:http://muchs.cn/article10/ieghdo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供關(guān)鍵詞優(yōu)化服務(wù)器托管、外貿(mào)網(wǎng)站建設(shè)網(wǎng)站制作、App設(shè)計(jì)定制網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

手機(jī)網(wǎng)站建設(shè)