Canal1.1.4中怎么使用RocketMQ將MySQL同步到Redis

今天就跟大家聊聊有關(guān)Canal1.1.4中怎么使用RocketMQ將MySQL同步到redis,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

為蒙自等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計制作服務(wù),及蒙自網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為成都網(wǎng)站設(shè)計、做網(wǎng)站、蒙自網(wǎng)站設(shè)計,以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!

一、Canal使用RocketMQ同步MySQL

Canal結(jié)合RocketMQ同步MySQL

二、 同步數(shù)據(jù)到Redis

2.1 安裝Redis

2.2 Redis配置

2.3 SpringBoot配置

2.3.1 引入依賴
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>

<!-- 根據(jù)個人需要依賴 -->
<dependency>
    <groupId>javax.persistence</groupId>
    <artifactId>persistence-api</artifactId>
</dependency>
2.3.2 通用代碼

SQLType.java

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
 * Canal監(jiān)聽SQL類型
 *
 * @author Yu
 * @date 2019/09/08 00:18
 **/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class SQLType {

    /**插入*/
    public static final String INSERT = "INSERT";
    /**更新*/
    public static final String UPDATE = "UPDATE";
    /**刪除*/
    public static final String DELETE = "DELETE";

}

User.java

import lombok.Data;
import javax.persistence.Id;
import java.io.Serializable;

/**
 * UserPo對象
 *
 * @author Yu
 * @date 2019/09/08 14:13
 **/

@Data
public class User implements Serializable {

    private static final long serialVersionUID = -6845801275112259322L;

    @Id
    private Integer uid;

    private String username;

    private String password;

    private String sex;

}

CanalSynService.java

import com.alibaba.otter.canal.protocol.FlatMessage;
import java.util.Collection;

/**
 * Canal同步服務(wù)
 *
 * @author Yu
 * @date 2019/09/08 00:00
 **/

public interface CanalSynService<T> {

    /**
     * 處理數(shù)據(jù)
     *
     * @param flatMessage CanalMQ數(shù)據(jù)
     */
    void process(FlatMessage flatMessage);

    /**
     * DDL語句處理
     *
     * @param flatMessage CanalMQ數(shù)據(jù)
     */
    void ddl(FlatMessage flatMessage);

    /**
     * 插入
     *
     * @param list 新增數(shù)據(jù)
     */
    void insert(Collection<T> list);

    /**
     * 更新
     *
     * @param list 更新數(shù)據(jù)
     */
    void update(Collection<T> list);

    /**
     * 刪除
     *
     * @param list 刪除數(shù)據(jù)
     */
    void delete(Collection<T> list);

}

AbstractCanalMQ2RedisService.java

import com.alibaba.otter.canal.protocol.FlatMessage;
import com.google.common.collect.Sets;
import com.taco.springcloud.canal.constant.SQLType;
import com.taco.springcloud.core.component.ApplicationContextHolder;
import com.taco.springcloud.core.exception.BizException;
import com.taco.springcloud.core.exception.constants.BaseApiCodeEnum;
import com.taco.springcloud.core.utils.JsonUtil;
import com.taco.springcloud.redis.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.util.ReflectionUtils;
import javax.annotation.Resource;
import javax.persistence.Id;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.util.*;


/**
 * 抽象CanalMQ通用處理服務(wù)
 *
 * @author Yu
 * @date 2019/09/08 00:05
 **/

@Slf4j
public abstract class AbstractCanalMQ2RedisService<T> implements CanalSynService<T> {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Resource
    private RedisUtils redisUtils;

    private Class<T> cache;

    /**
     * 獲取Model名稱
     *
     * @return Model名稱
     */
    protected abstract String getModelName();

    @Override
    public void process(FlatMessage flatMessage) {

        if(flatMessage.getIsDdl()) {
            ddl(flatMessage);
            return;
        }

        Set<T> data = getData(flatMessage);

        if(SQLType.INSERT.equals(flatMessage.getType())) {
            insert(data);
        }

        if(SQLType.UPDATE.equals(flatMessage.getType())) {
            update(data);
        }

        if(SQLType.DELETE.equals(flatMessage.getType())) {
            delete(data);
        }

    }

    @Override
    public void ddl(FlatMessage flatMessage) {
        //TODO : DDL需要同步,刪庫清空,更新字段處理

    }

    @Override
    public void insert(Collection<T> list) {
        insertOrUpdate(list);
    }

    @Override
    public void update(Collection<T> list) {
        insertOrUpdate(list);
    }

    private void insertOrUpdate(Collection<T> list) {
        redisTemplate.executePipelined( (RedisConnection redisConnection) -> {
            for (T data : list) {
                String key = getWrapRedisKey(data);
                RedisSerializer keySerializer = redisTemplate.getKeySerializer();
                RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
                redisConnection.set(keySerializer.serialize(key), valueSerializer.serialize(data));
            }
            return null;
        });
    }

    @Override
    public void delete(Collection<T> list) {

        Set<String> keys = Sets.newHashSetWithExpectedSize(list.size());

        for (T data : list) {
            keys.add(getWrapRedisKey(data));
        }

        //Set<String> keys = list.stream().map(this::getWrapRedisKey).collect(Collectors.toSet());
        redisUtils.delAll(keys);
    }

    /**
     * 封裝redis的key
     *
     * @param t 原對象
     * @return  key
     */
    protected String getWrapRedisKey(T t) {
        return new StringBuilder()
                        .append(ApplicationContextHolder.getApplicationName())
                        .append(":")
                        .append(getModelName())
                        .append(":")
                        .append(getIdValue(t))
                        .toString();

    }

    /**
     * 獲取類泛型
     *
     * @return 泛型Class
     */
    protected Class<T> getTypeArguement() {
        if(cache == null) {
            cache = (Class<T>) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        }
        return cache;
    }

    /**
     * 獲取Object標(biāo)有@Id注解的字段值
     *
     * @param t 對象
     * @return  id值
     */
    protected Object getIdValue(T t) {
        Field fieldOfId = getIdField();
        ReflectionUtils.makeAccessible(fieldOfId);
        return ReflectionUtils.getField(fieldOfId, t);
    }

    /**
     * 獲取Class標(biāo)有@Id注解的字段名稱
     *
     * @return id字段名稱
     */
    protected Field getIdField() {

        Class<T> clz = getTypeArguement();
        Field[] fields = clz.getDeclaredFields();
        for (Field field : fields) {
            Id annotation = field.getAnnotation(Id.class);

            if (annotation != null) {
                return field;
            }
        }

        log.error("PO類未設(shè)置@Id注解");
        throw new BizException(BaseApiCodeEnum.FAIL);
    }

    /**
     * 轉(zhuǎn)換Canal的FlatMessage中data成泛型對象
     *
     * @param flatMessage   Canal發(fā)送MQ信息
     * @return              泛型對象集合
     */
    protected Set<T> getData(FlatMessage flatMessage) {
        List<Map<String, String>> sourceData = flatMessage.getData();
        Set<T> targetData = Sets.newHashSetWithExpectedSize(sourceData.size());
        for (Map<String, String> map : sourceData) {
            T t = JsonUtil.mapConvertPojo(map, getTypeArguement());
            targetData.add(t);
        }
        return targetData;
    }

}

TestUsersConsumer.java

import com.alibaba.otter.canal.protocol.FlatMessage;
import com.taco.springcloud.canal.model.User;
import com.taco.springcloud.canal.service.AbstractCanalMQ2RedisService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;




@Slf4j
@Service
@RocketMQMessageListener(topic = "test_users", consumerGroup = "users")
public class TestUsersConsumer extends AbstractCanalMQ2RedisService<User> implements RocketMQListener<FlatMessage> {

    @Getter
    private String modelName = "user";

    @Override
    public void onMessage(FlatMessage s) {
        process(s);
    }
}

看完上述內(nèi)容,你們對Canal1.1.4中怎么使用RocketMQ將MySQL同步到Redis有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。

當(dāng)前標(biāo)題:Canal1.1.4中怎么使用RocketMQ將MySQL同步到Redis
文章位置:http://www.muchs.cn/article46/pieheg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號、微信小程序、網(wǎng)站設(shè)計、網(wǎng)站制作定制開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司