RocketMq-半消息(十)

概念: 
半消息: 在原有隊列消息執(zhí)行后的邏輯,如果后面的本地邏輯出錯,則不發(fā)送該消息,如果通過則告知rocketmq發(fā)送

操作步驟 :
1.(生產(chǎn)者)發(fā)送-【半消息】
2.(生產(chǎn)者)本地監(jiān)聽-【半消息】處理結果
3.(消費者)處理-【半消息】

1.(生產(chǎn)者)發(fā)送-【半消息】

10年的宣化網(wǎng)站建設經(jīng)驗,針對設計、前端、開發(fā)、售后、文案、推廣等六對一服務,響應快,48小時及時工作處理。成都全網(wǎng)營銷推廣的優(yōu)勢是能夠根據(jù)用戶設備顯示端的尺寸不同,自動調(diào)整宣化建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設計,從而大程度地提升瀏覽體驗。成都創(chuàng)新互聯(lián)公司從事“宣化網(wǎng)站設計”,“宣化網(wǎng)站推廣”以來,每個客戶項目都認真落實執(zhí)行。

// 消息體
@Data
@Builder
@ToString
public class UserMoneyParams {
    int userId;
    String act;
    double money;
    String info;
    String infoParams;
}

// 發(fā)送消息
// 發(fā)送-隊列半消息: rocketMQ
@RequestMapping("rocketMQHalf")
public ApiResult rocketMQHalf() {
        int orderId = 2;
        double money = 10;

        // 用戶余額變更-參數(shù)體
        UserMoneyParams userMoneyParams = UserMoneyParams.builder()
                        .act("pay-order")
                        .userId(orderId)
                        .money(money)
                        .build();

        // 用戶數(shù)據(jù)變更-參數(shù)
        UserOrder userOrder = this.userOrderMapper.selectByPrimaryKey(1);

        log.info("發(fā)送前參數(shù): "+userMoneyParams.toString());

        rocketMQTemplate.sendMessageInTransaction(
                        // 半消息-分組
                        "tsca-group-half",
                        // 半消息-topic
                        "member-change-money-half-topic",
                        // 半消息-數(shù)據(jù)體
                        MessageBuilder
                                        .withPayload(userMoneyParams)
                                        .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID())
                                        .build(),
                        userOrder
        );

        return ApiResult.success("發(fā)送隊列-半消息");
}

2.(生產(chǎn)者)本地監(jiān)聽-【半消息】處理結果

@RocketMQTransactionListener(txProducerGroup = "tsca-group-half")
@RequiredArgsConstructor
@Slf4j
public class UserMoneyHalfListener implements RocketMQLocalTransactionListener {

    @Autowired
    redisUtil redisUtil;

    @Autowired
    UserOrderService userOrderService;

    // 生產(chǎn)者-消息處理完畢,繼續(xù)執(zhí)行本地方法(含事務)
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        try {
            Object userMoneyParams=message.getPayload();
            log.info("消息-args:"+arg);
            // 消息主體加密無法獲取
            log.info("消息-主體:"+ JSON.toJSONString(userMoneyParams));
            log.info("消息-主體-頭部:"+message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID));
            log.info("半消息-本地-處理完成");
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.warn("半消息-本地-發(fā)生異常,回滾: "+e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    // 生產(chǎn)者-消息處理超時
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // 查詢消息是否已經(jīng)處理
        String messageID = String.valueOf(message.getHeaders().get("tsca-half-message-id"));
        Object messageData = this.redisUtil.getValue(messageID, String.class);
        if (messageData != null && messageData.equals("ok")) {
            // 超時且消息已經(jīng)處理完畢
            log.info("半消息-本地消息超時-且已經(jīng)處理完畢");
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            log.info("半消息-本地消息超時-且未處理完畢");
            // 超時且消息未處理完畢
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

3.(消費者)處理-【半消息】

@Service
@RocketMQMessageListener(consumerGroup = "tsca-group-half", topic = "member-change-money-half-topic")
@Slf4j
public class UserMoneyHalfListener implements RocketMQListener<UserMoneyParams> {
//    @Autowired
//    UserMoneyService memberOrderService;

    @Override
    public void onMessage(UserMoneyParams memberMoneyMessage) {
        log.info("收到-用戶余額變動-半消息");
        try {
        } catch (Exception e) {
            log.info("更改余額錯誤: "+e.getMessage());
            e.printStackTrace();
        }
        log.info(JSON.toJSONString(memberMoneyMessage));
    }
}

標題名稱:RocketMq-半消息(十)
本文來源:http://muchs.cn/article48/jpjoep.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供ChatGPT定制開發(fā)、網(wǎng)頁設計公司網(wǎng)站收錄、靜態(tài)網(wǎng)站、網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司