RocketMQ中如何實(shí)現(xiàn)并行模式

這篇文章主要介紹了RocketMQ中如何實(shí)現(xiàn)并行模式,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

為武川等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計制作服務(wù),及武川網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為網(wǎng)站設(shè)計、成都網(wǎng)站制作、武川網(wǎng)站設(shè)計,以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!

DefaultMQPushConsumerImpl.pullMessage中的PullCallback在接收到拉取的message之后,會調(diào)用ConsumeMessageService.submitConsumeRequest方法將消息“推”給listener來執(zhí)行業(yè)務(wù)處理。RocketMQ支持并行和順序兩種消費(fèi)模式,本文主要講解并行模式ConsumeMessageConcurrentlyService的實(shí)現(xiàn)。該類包含一下關(guān)鍵屬性:

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
    // ...
    private final MessageListenerConcurrently messageListener; // 業(yè)務(wù)處理回掉
    private final BlockingQueue<Runnable> consumeRequestQueue;  // consumerExecutor的并發(fā)隊(duì)列
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;

    // ...

    public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;

        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));

        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }

    //...
}

ConsumeMessageConcurrentlyService.submitConsumeRequest會將拉取到的message列表按配置的分批策略做分割,提交執(zhí)行:

    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 配置的批次大小
        if (msgs.size() <= consumeBatchSize) {
            // 拉取的消息列表長度小于配置的批次大小,一次性提交處理
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            // 否則,將message分割后提交
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }

                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }

                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

實(shí)際處理邏輯在ConsumeRequest.run方法中

    // ConsumeRequest    
    public void run() {
            if (this.processQueue.isDropped()) {
                log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                return;
            }

            MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
            ConsumeConcurrentlyStatus status = null;
            defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

            ConsumeMessageContext consumeMessageContext = null;
            // ....

            long beginTimestamp = System.currentTimeMillis();
            boolean hasException = false;
            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
            try {
                if (msgs != null && !msgs.isEmpty()) {
                    for (MessageExt msg : msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }
                // 1. 調(diào)用listener
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            } catch (Throwable e) {
                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                    RemotingHelper.exceptionSimpleDesc(e),
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                hasException = true;
            }
            long consumeRT = System.currentTimeMillis() - beginTimestamp;
            // ...

            if (!processQueue.isDropped()) {
                // 2. 處理業(yè)務(wù)調(diào)用結(jié)果
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            } else {
                // ...
            }
        }
    }

processConsumeResult將處理消費(fèi)結(jié)果:

    public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex(); // 消費(fèi)成功的index

        if (consumeRequest.getMsgs().isEmpty())
            return;

        // ...

        // 1. 消費(fèi)失敗msg處理
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    // 廣播模式,僅打印消費(fèi)失敗的msg
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                // 集群模式,首先嘗試將消費(fèi)失敗的msg發(fā)回到broker,若失敗則在本地嘗試reconsume
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);

                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

        // 2. 更新offset store
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“RocketMQ中如何實(shí)現(xiàn)并行模式”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

當(dāng)前標(biāo)題:RocketMQ中如何實(shí)現(xiàn)并行模式
URL鏈接:http://muchs.cn/article46/pdpgeg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版、電子商務(wù)、品牌網(wǎng)站建設(shè)、App開發(fā)微信公眾號、響應(yīng)式網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作