RocketMq-半消息(十)-創(chuàng)新互聯(lián)

概念: 
半消息: 在原有隊(duì)列消息執(zhí)行后的邏輯,如果后面的本地邏輯出錯(cuò),則不發(fā)送該消息,如果通過(guò)則告知rocketmq發(fā)送

操作步驟 :
1.(生產(chǎn)者)發(fā)送-【半消息】
2.(生產(chǎn)者)本地監(jiān)聽(tīng)-【半消息】處理結(jié)果
3.(消費(fèi)者)處理-【半消息】

1.(生產(chǎn)者)發(fā)送-【半消息】

創(chuàng)新互聯(lián)專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、麻城網(wǎng)絡(luò)推廣、微信小程序、麻城網(wǎng)絡(luò)營(yíng)銷、麻城企業(yè)策劃、麻城品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們大的嘉獎(jiǎng);創(chuàng)新互聯(lián)為所有大學(xué)生創(chuàng)業(yè)者提供麻城建站搭建服務(wù),24小時(shí)服務(wù)熱線:18980820575,官方網(wǎng)址:muchs.cn
// 消息體
@Data
@Builder
@ToString
public class UserMoneyParams {
    int userId;
    String act;
    double money;
    String info;
    String infoParams;
}

// 發(fā)送消息
// 發(fā)送-隊(duì)列半消息: rocketMQ
@RequestMapping("rocketMQHalf")
public ApiResult rocketMQHalf() {
        int orderId = 2;
        double money = 10;

        // 用戶余額變更-參數(shù)體
        UserMoneyParams userMoneyParams = UserMoneyParams.builder()
                        .act("pay-order")
                        .userId(orderId)
                        .money(money)
                        .build();

        // 用戶數(shù)據(jù)變更-參數(shù)
        UserOrder userOrder = this.userOrderMapper.selectByPrimaryKey(1);

        log.info("發(fā)送前參數(shù): "+userMoneyParams.toString());

        rocketMQTemplate.sendMessageInTransaction(
                        // 半消息-分組
                        "tsca-group-half",
                        // 半消息-topic
                        "member-change-money-half-topic",
                        // 半消息-數(shù)據(jù)體
                        MessageBuilder
                                        .withPayload(userMoneyParams)
                                        .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID())
                                        .build(),
                        userOrder
        );

        return ApiResult.success("發(fā)送隊(duì)列-半消息");
}

2.(生產(chǎn)者)本地監(jiān)聽(tīng)-【半消息】處理結(jié)果

@RocketMQTransactionListener(txProducerGroup = "tsca-group-half")
@RequiredArgsConstructor
@Slf4j
public class UserMoneyHalfListener implements RocketMQLocalTransactionListener {

    @Autowired
    RedisUtil redisUtil;

    @Autowired
    UserOrderService userOrderService;

    // 生產(chǎn)者-消息處理完畢,繼續(xù)執(zhí)行本地方法(含事務(wù))
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        try {
            Object userMoneyParams=message.getPayload();
            log.info("消息-args:"+arg);
            // 消息主體加密無(wú)法獲取
            log.info("消息-主體:"+ JSON.toJSONString(userMoneyParams));
            log.info("消息-主體-頭部:"+message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID));
            log.info("半消息-本地-處理完成");
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.warn("半消息-本地-發(fā)生異常,回滾: "+e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    // 生產(chǎn)者-消息處理超時(shí)
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // 查詢消息是否已經(jīng)處理
        String messageID = String.valueOf(message.getHeaders().get("tsca-half-message-id"));
        Object messageData = this.redisUtil.getValue(messageID, String.class);
        if (messageData != null && messageData.equals("ok")) {
            // 超時(shí)且消息已經(jīng)處理完畢
            log.info("半消息-本地消息超時(shí)-且已經(jīng)處理完畢");
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            log.info("半消息-本地消息超時(shí)-且未處理完畢");
            // 超時(shí)且消息未處理完畢
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

3.(消費(fèi)者)處理-【半消息】

@Service
@RocketMQMessageListener(consumerGroup = "tsca-group-half", topic = "member-change-money-half-topic")
@Slf4j
public class UserMoneyHalfListener implements RocketMQListener<UserMoneyParams> {
//    @Autowired
//    UserMoneyService memberOrderService;

    @Override
    public void onMessage(UserMoneyParams memberMoneyMessage) {
        log.info("收到-用戶余額變動(dòng)-半消息");
        try {
        } catch (Exception e) {
            log.info("更改余額錯(cuò)誤: "+e.getMessage());
            e.printStackTrace();
        }
        log.info(JSON.toJSONString(memberMoneyMessage));
    }
}

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。

當(dāng)前題目:RocketMq-半消息(十)-創(chuàng)新互聯(lián)
文章起源:http://muchs.cn/article18/degddp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App設(shè)計(jì)移動(dòng)網(wǎng)站建設(shè)、網(wǎng)站策劃網(wǎng)站設(shè)計(jì)公司、電子商務(wù)、Google

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁(yè)設(shè)計(jì)