flinksqlcdc怎么使用

本篇內(nèi)容主要講解“flink sql cdc怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“flink sql cdc怎么使用”吧!

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項(xiàng)目有:域名注冊、雅安服務(wù)器托管、營銷軟件、網(wǎng)站建設(shè)、隴南網(wǎng)站維護(hù)、網(wǎng)站推廣。

前言

CDC,Change Data Capture,變更數(shù)據(jù)獲取的簡稱,使用CDC我們可以從數(shù)據(jù)庫中獲取已提交的更改并將這些更改發(fā)送到下游,供下游使用。這些變更可以包括INSERT,DELETE,UPDATE等.

用戶可以在如下的場景使用cdc:

  • 實(shí)時數(shù)據(jù)同步:比如我們將MySQL庫中的數(shù)據(jù)同步到我們的數(shù)倉中。

  • 數(shù)據(jù)庫的實(shí)時物化視圖。

flink消費(fèi)cdc數(shù)據(jù)

在以前的數(shù)據(jù)同步中,比如我們想實(shí)時獲取數(shù)據(jù)庫的數(shù)據(jù),一般采用的架構(gòu)就是采用第三方工具,比如canal、debezium等,實(shí)時采集數(shù)據(jù)庫的變更日志,然后將數(shù)據(jù)發(fā)送到kafka等消息隊(duì)列。然后再通過其他的組件,比如flink、spark等等來消費(fèi)kafka的數(shù)據(jù),計算之后發(fā)送到下游系統(tǒng)。整體的架構(gòu)如下所示:

flink sql cdc怎么使用

對于上面的這種架構(gòu),flink承擔(dān)的角色是計算層,目前flink提供的format有兩種格式:canal-json和debezium-json,下面我們簡單的介紹下。

canal format

在國內(nèi),用的比較多的是阿里巴巴開源的canal,我們可以使用canal訂閱mysql的binlog日志,canal會將mysql庫的變更數(shù)據(jù)組織成它固定的JSON或protobuf 格式發(fā)到kafka,以供下游使用。

canal解析后的json數(shù)據(jù)格式如下:

{
 "data": [
   {
     "id": "111",
     "name": "scooter",
     "description": "Big 2-wheel scooter",
     "weight": "5.18"
   }
 ],
 "database": "inventory",
 "es": 1589373560000,
 "id": 9,
 "isDdl": false,
 "mysqlType": {
   "id": "INTEGER",
   "name": "VARCHAR(255)",
   "description": "VARCHAR(512)",
   "weight": "FLOAT"
 },
 "old": [
   {
     "weight": "5.15"
   }
 ],
 "pkNames": [
   "id"
 ],
 "sql": "",
 "sqlType": {
   "id": 4,
   "name": 12,
   "description": 12,
   "weight": 7
 },
 "table": "products",
 "ts": 1589373560798,
 "type": "UPDATE"
}

簡單講下幾個核心的字段:

  • type : 描述操作的類型,包括‘UPDATE’, 'INSERT', 'DELETE'。

  • data : 代表操作的數(shù)據(jù)。如果為'INSERT',則表示行的內(nèi)容;如果為'UPDATE',則表示行的更新后的狀態(tài);如果為'DELETE',則表示刪除前的狀態(tài)。

  • old :可選字段,如果存在,則表示更新之前的內(nèi)容,如果不是update操作,則為 null。

完整的語義如下;

    private String                    destination;                            // 對應(yīng)canal的實(shí)例或者M(jìn)Q的topic
   private String                    groupId;                                // 對應(yīng)mq的group id
   private String                    database;                               // 數(shù)據(jù)庫或schema
   private String                    table;                                  // 表名
   private List<String>              pkNames;
   private Boolean                   isDdl;
   private String                    type;                                   // 類型: INSERT UPDATE DELETE
   // binlog executeTime
   private Long                      es;                                     // 執(zhí)行耗時
   // dml build timeStamp
   private Long                      ts;                                     // 同步時間
   private String                    sql;                                    // 執(zhí)行的sql, dml sql為空
   private List<Map<String, Object>> data;                                   // 數(shù)據(jù)列表
   private List<Map<String, Object>> old;                                    // 舊數(shù)據(jù)列表, 用于update, size和data的size一一對應(yīng)

在flink sql中,消費(fèi)這個數(shù)據(jù)的sql如下:



CREATE TABLE topic_products (
 id BIGINT,
 name STRING,
 description STRING,
 weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json'  -- using canal-json as the format
)

其中DDL中的表的字段和類型要和mysql中的字段及類型能匹配的上,接下來我們就可以寫flink sql來查詢我們定義的topic_products了。

debezium format

在國外,比較有名的類似canal的開源工具有debezium,它的功能較canal更加強(qiáng)大一些,不僅僅支持mysql。還支持其他的數(shù)據(jù)庫的同步,比如 PostgreSQL、Oracle等,目前debezium支持的序列化格式為 JSON 和 Apache Avro 。

debezium提供的格式如下:

{
"before": {
  "id": 111,
  "name": "scooter",
  "description": "Big 2-wheel scooter",
  "weight": 5.18
},
"after": {
  "id": 111,
  "name": "scooter",
  "description": "Big 2-wheel scooter",
  "weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}

同樣,使用flink sql來消費(fèi)的時候,sql和上面使用canal類似,只需要把foramt改成debezium-json即可。

CanalJson反序列化源碼解析

接下來我們看下flink的源碼中canal-json格式的實(shí)現(xiàn)。canal 格式作為一種flink的格式,而且是source,所以也就是涉及到讀取數(shù)據(jù)的時候進(jìn)行反序列化,我們接下來就簡單看看CanalJson的反序列化的實(shí)現(xiàn)。具體的實(shí)現(xiàn)類是CanalJsonDeserializationSchema。

我們看下這個最核心的反序列化方法:

	@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
try {
//使用json反序列化器將message反序列化成RowData
RowData row = jsonDeserializer.deserialize(message);

//獲取type字段,用于下面的判斷
String type = row.getString(2).toString();
if (OP_INSERT.equals(type)) {
// 如果操作類型是insert,則data數(shù)組表示的是要插入的數(shù)據(jù),則循環(huán)遍歷data,然后添加一個標(biāo)識INSERT,構(gòu)造RowData對象,發(fā)送下游。
ArrayData data = row.getArray(0);
for (int i = 0; i < data.size(); i++) {
RowData insert = data.getRow(i, fieldCount);
insert.setRowKind(RowKind.INSERT);
out.collect(insert);
}
} else if (OP_UPDATE.equals(type)) {
// 如果是update操作,從data字段里獲取更新后的數(shù)據(jù)、
ArrayData data = row.getArray(0);
// old字段獲取更新之前的數(shù)據(jù)
ArrayData old = row.getArray(1);
for (int i = 0; i < data.size(); i++) {
// the underlying JSON deserialization schema always produce GenericRowData.
GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
for (int f = 0; f < fieldCount; f++) {
if (before.isNullAt(f)) {
//如果old字段非空,則說明進(jìn)行了數(shù)據(jù)的更新,如果old字段是null,則說明更新前后數(shù)據(jù)一樣,這個時候把before的數(shù)據(jù)也設(shè)置成after的,也就是發(fā)送給下游的before和after數(shù)據(jù)一樣。
before.setField(f, after.getField(f));
}
}
before.setRowKind(RowKind.UPDATE_BEFORE);
after.setRowKind(RowKind.UPDATE_AFTER);
//把更新前后的數(shù)據(jù)都發(fā)送下游
out.collect(before);
out.collect(after);
}
} else if (OP_DELETE.equals(type)) {
// 如果是刪除操作,data字段里包含將要被刪除的數(shù)據(jù),把這些數(shù)據(jù)組織起來發(fā)送給下游
ArrayData data = row.getArray(0);
for (int i = 0; i < data.size(); i++) {
RowData insert = data.getRow(i, fieldCount);
insert.setRowKind(RowKind.DELETE);
out.collect(insert);
}
} else {
if (!ignoreParseErrors) {
throw new IOException(format(
"Unknown \"type\" value \"%s\". The Canal JSON message is '%s'", type, new String(message)));
}
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(format(
"Corrupt Canal JSON message '%s'.", new String(message)), t);
}
}
}

flink cdc connector

背景

對于上面的架構(gòu),我們需要部署canal(debezium)+ kafka,然后flink再從kafka消費(fèi)數(shù)據(jù),這種架構(gòu)下我們需要部署多個組件,并且數(shù)據(jù)也需要落地到kafka,有沒有更好的方案來精簡下這個流程呢?我們接下來講講flink提供的cdc connector。

這個connector并沒有包含在flink的代碼里,具體的地址是在https://github.com/ververica/flink-cdc-connectors里,詳情大家可以看下這里面的內(nèi)容。

這種架構(gòu)下,flink直接消費(fèi)數(shù)據(jù)庫的增量日志,替代了原來作為數(shù)據(jù)采集層的canal(debezium),然后直接進(jìn)行計算,經(jīng)過計算之后,將計算結(jié)果 發(fā)送到下游。整體架構(gòu)如下:

flink sql cdc怎么使用

使用這種架構(gòu)是好處有:

  • 減少canal和kafka的維護(hù)成本,鏈路更短,延遲更低

  • flink提供了exactly once語義

  • 可以從指定position讀取

  • 去掉了kafka,減少了消息的存儲成本

mysql-cdc

目前flink支持兩種內(nèi)置的connector,PostgreSQL和mysql,接下來我們以mysql為例簡單講講。

在使用之前,我們需要引入相應(yīng)的pom,mysql的pom如下:

<dependency>
 <groupId>com.alibaba.ververica</groupId>
 <!-- add the dependency matching your database -->
 <artifactId>flink-connector-mysql-cdc</artifactId>
 <version>1.1.0</version>
</dependency>

如果是sql客戶端使用,需要下載 flink-sql-connector-mysql-cdc-1.1.0.jar 并且放到<FLINK_HOME>/lib/下面

連接mysql數(shù)據(jù)庫的示例sql如下:

CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
)

如果訂閱的是postgres數(shù)據(jù)庫,我們需要把connector替換成postgres-cdc,DDL中表的schema和數(shù)據(jù)庫一一對應(yīng)。

更加詳細(xì)的配置參見:

https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector

mysql-cdc connector源碼解析

接下來我們以mysql-cdc為例,看看源碼層級是怎么實(shí)現(xiàn)的。既然作為一個sql的connector,那么就首先會有一個對應(yīng)的TableFactory,然后在工廠類里面構(gòu)造相應(yīng)的source,最后將消費(fèi)下來的數(shù)據(jù)轉(zhuǎn)成flink認(rèn)識的RowData格式,發(fā)送到下游。

我們按照這個思路來看看flink cdc源碼的實(shí)現(xiàn)。

在flink-connector-mysql-cdc module中,找到其對應(yīng)的工廠類:MySQLTableSourceFactory,進(jìn)入createDynamicTableSource(Context context)方法,在這個方法里,使用從ddl中的屬性里獲取的host、dbname等信息構(gòu)造了一個MySQLTableSource類。

MySQLTableSource

在MySQLTableSource#getScanRuntimeProvider方法里,我們看到,首先構(gòu)造了一個用于序列化的對象RowDataDebeziumDeserializeSchema,這個對象主要是用于將Debezium獲取的SourceRecord格式的數(shù)據(jù)轉(zhuǎn)化為flink認(rèn)識的RowData對象。 我們看下RowDataDebeziumDeserializeSchem#deserialize方法,這里的操作主要就是先判斷下進(jìn)來的數(shù)據(jù)類型(insert 、update、delete),然后針對不同的類型(short、int等)分別進(jìn)行轉(zhuǎn)換,

最后我們看到用于flink用于獲取數(shù)據(jù)庫變更日志的Source函數(shù)是DebeziumSourceFunction,且最終返回的類型是RowData。

也就是說flink底層是采用了Debezium工具從mysql、postgres等數(shù)據(jù)庫中獲取的變更數(shù)據(jù)。

	@SuppressWarnings("unchecked")
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo = (TypeInformation<RowData>) scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer = new RowDataDebeziumDeserializeSchema(
rowType,
typeInfo,
((rowData, rowKind) -> {}),
serverTimeZone);
MySQLSource.Builder<RowData> builder = MySQLSource.<RowData>builder()
.hostname(hostname)
..........
DebeziumSourceFunction<RowData> sourceFunction = builder.build();

return SourceFunctionProvider.of(sourceFunction, false);
}

DebeziumSourceFunction

我們接下來看看DebeziumSourceFunction類

@PublicEvolving
public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
CheckpointedFunction,
ResultTypeQueryable<T> {
.............
}

我們看到DebeziumSourceFunction類繼承了RichSourceFunction,并且實(shí)現(xiàn)了CheckpointedFunction接口,也就是說這個類是flink的一個SourceFunction,會從源端(run方法)獲取數(shù)據(jù),發(fā)送給下游。此外這個類還實(shí)現(xiàn)了CheckpointedFunction接口,也就是會通過checkpoint的機(jī)制來保證exactly once語義。

接下來我們進(jìn)入run方法,看看是如何獲取數(shù)據(jù)庫的變更數(shù)據(jù)的。



@Override
public void run(SourceContext<T> sourceContext) throws Exception {
       ...........................
// DO NOT include schema change, e.g. DDL
properties.setProperty("include.schema.changes", "false");
        ...........................
       //將所有的屬性信息打印出來,以便排查。
// dump the properties
String propsString = properties.entrySet().stream()
.map(t -> "\t" + t.getKey().toString() + " = " + t.getValue().toString() + "\n")
.collect(Collectors.joining());
LOG.info("Debezium Properties:\n{}", propsString);

//用于具體的處理數(shù)據(jù)的邏輯
this.debeziumConsumer = new DebeziumChangeConsumer<>(
sourceContext,
deserializer,
restoredOffsetState == null, // DB snapshot phase if restore state is null
this::reportError);

// create the engine with this configuration ...
this.engine = DebeziumEngine.create(Connect.class)
.using(properties)
.notifying(debeziumConsumer)  // 數(shù)據(jù)發(fā)給上面的debeziumConsumer
.using((success, message, error) -> {
if (!success && error != null) {
this.reportError(error);
}
})
.build();

if (!running) {
return;
}

// run the engine asynchronously
executor.execute(engine);

       //循環(huán)判斷,當(dāng)程序被打斷,或者有錯誤的時候,打斷engine,并且拋出異常
// on a clean exit, wait for the runner thread
try {
while (running) {
if (executor.awaitTermination(5, TimeUnit.SECONDS)) {
break;
}
if (error != null) {
running = false;
shutdownEngine();
// rethrow the error from Debezium consumer
ExceptionUtils.rethrow(error);
}
}
}
catch (InterruptedException e) {
// may be the result of a wake-up interruption after an exception.
// we ignore this here and only restore the interruption state
Thread.currentThread().interrupt();
}
}

在函數(shù)的開始,設(shè)置了很多的properties,比如include.schema.changes 設(shè)置為false,也就是不包含表的DDL操作,表結(jié)構(gòu)的變更是不捕獲的。我們這里只關(guān)注數(shù)據(jù)的增刪改。

接下來構(gòu)造了一個DebeziumChangeConsumer對象,這個類實(shí)現(xiàn)了DebeziumEngine.ChangeConsumer接口,主要就是將獲取到的一批數(shù)據(jù)進(jìn)行一條條的加工處理。

接下來定一個DebeziumEngine對象,這個對象是真正用來干活的,它的底層使用了kafka的connect-api來進(jìn)行獲取數(shù)據(jù),得到的是一個org.apache.kafka.connect.source.SourceRecord對象。通過notifying方法將得到的數(shù)據(jù)交給上面定義的DebeziumChangeConsumer來來覆蓋缺省實(shí)現(xiàn)以進(jìn)行復(fù)雜的操作。

接下來通過一個線程池ExecutorService來異步的啟動這個engine。

最后,做了一個循環(huán)判斷,當(dāng)程序被打斷,或者有錯誤的時候,打斷engine,并且拋出異常。

總結(jié)一下,就是在Flink的source函數(shù)里,使用Debezium 引擎獲取對應(yīng)的數(shù)據(jù)庫變更數(shù)據(jù)(SourceRecord),經(jīng)過一系列的反序列化操作,最終轉(zhuǎn)成了flink中的RowData對象,發(fā)送給下游。

changelog format

使用場景

當(dāng)我們從mysql-cdc獲取數(shù)據(jù)庫的變更數(shù)據(jù),或者寫了一個group by的查詢的時候,這種結(jié)果數(shù)據(jù)都是不斷變化的,我們?nèi)绾螌⑦@些變化的數(shù)據(jù)發(fā)到只支持append mode的kafka隊(duì)列呢?

于是flink提供了一種changelog format,其實(shí)我們非常簡單的理解為,flink對進(jìn)來的RowData數(shù)據(jù)進(jìn)行了一層包裝,然后加了一個數(shù)據(jù)的操作類型,包括以下幾種 INSERT,DELETE, UPDATE_BEFORE,UPDATE_AFTER。這樣當(dāng)下游獲取到這個數(shù)據(jù)的時候,就可以根據(jù)數(shù)據(jù)的類型來判斷下如何對數(shù)據(jù)進(jìn)行操作了。

比如我們的原始數(shù)據(jù)格式是這樣的。

{"day":"2020-06-18","gmv":100}

經(jīng)過changelog格式的加工之后,成為了下面的格式:

{"data":{"day":"2020-06-18","gmv":100},"op":"+I"}

也就是說changelog format對原生的格式進(jìn)行了包裝,添加了一個op字段,表示數(shù)據(jù)的操作類型,目前有以下幾種:

  • +I:插入操作。

  • -U :更新之前的數(shù)據(jù)內(nèi)容:

  • +U :更新之后的數(shù)據(jù)內(nèi)容。

  • -D :刪除操作。

示例

使用的時候需要引入相應(yīng)的pom

<dependency>
 <groupId>com.alibaba.ververica</groupId>
 <artifactId>flink-format-changelog-json</artifactId>
 <version>1.1.0</version>
</dependency>

使用flink sql操作的方式如下:

CREATE TABLE kafka_gmv (
 day_str STRING,
 gmv DECIMAL(10, 5)
) WITH (
   'connector' = 'kafka',
   'topic' = 'kafka_gmv',
   'scan.startup.mode' = 'earliest-offset',
   'properties.bootstrap.servers' = 'localhost:9092',
   'format' = 'changelog-json'
);

我們定義了一個 format 為 changelog-json 的kafka connector,之后我們就可以對其進(jìn)行寫入和查詢了。

完整的代碼和配置請參考:
https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format

源碼淺析

作為一種flink的format ,我們主要看下其序列化和發(fā)序列化方法,changelog-json 使用了flink-json包進(jìn)行json的處理。

反序列化

反序列化用的是ChangelogJsonDeserializationSchema類,在其構(gòu)造方法里,我們看到主要是構(gòu)造了一個json的序列化器jsonDeserializer用于對數(shù)據(jù)進(jìn)行處理。

public ChangelogJsonDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean ignoreParseErrors,
TimestampFormat timestampFormatOption) {
this.resultTypeInfo = resultTypeInfo;
this.ignoreParseErrors = ignoreParseErrors;
this.jsonDeserializer = new JsonRowDataDeserializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
// the result type is never used, so it's fine to pass in Debezium's result type
resultTypeInfo,
false, // ignoreParseErrors already contains the functionality of failOnMissingField
ignoreParseErrors,
timestampFormatOption);
}

其中createJsonRowType方法指定了changelog的format是一種Row類型的格式,我們看下代碼:

private static RowType createJsonRowType(DataType databaseSchema) {
DataType payload = DataTypes.ROW(
DataTypes.FIELD("data", databaseSchema),
DataTypes.FIELD("op", DataTypes.STRING()));
return (RowType) payload.getLogicalType();
}

在這里,指定了這個row格式有兩個字段,一個是data,表示數(shù)據(jù)的內(nèi)容,一個是op,表示操作的類型。

最后看下最核心的ChangelogJsonDeserializationSchema#deserialize(byte[] bytes, Collector<RowData> out>)

@Override
public void deserialize(byte[] bytes, Collector<RowData> out) throws IOException {
try {
GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(bytes);
GenericRowData data = (GenericRowData) row.getField(0);
String op = row.getString(1).toString();
RowKind rowKind = parseRowKind(op);
data.setRowKind(rowKind);
out.collect(data);
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(format(
"Corrupt Debezium JSON message '%s'.", new String(bytes)), t);
}
}
}

使用jsonDeserializer對數(shù)據(jù)進(jìn)行處理,然后對第二個字段op進(jìn)行判斷,添加對應(yīng)的RowKind。

序列化

序列化的方法我們看下方法:ChangelogJsonSerializationSchema#serialize

	@Override
public byte[] serialize(RowData rowData) {
reuse.setField(0, rowData);
reuse.setField(1, stringifyRowKind(rowData.getRowKind()));
return jsonSerializer.serialize(reuse);
}

這塊沒有什么難度,就是將flink的RowData使用jsonSerializer序列化成字節(jié)數(shù)組。

到此,相信大家對“flink sql cdc怎么使用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

網(wǎng)頁名稱:flinksqlcdc怎么使用
轉(zhuǎn)載來于:http://muchs.cn/article32/isjipc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App開發(fā)靜態(tài)網(wǎng)站、微信小程序ChatGPT、關(guān)鍵詞優(yōu)化、云服務(wù)器

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)