本篇內(nèi)容主要講解“基于Flink1.11的SQL構(gòu)建實(shí)時(shí)數(shù)倉(cāng)怎么實(shí)現(xiàn)”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“基于Flink1.11的SQL構(gòu)建實(shí)時(shí)數(shù)倉(cāng)怎么實(shí)現(xiàn)”吧!
大石橋網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、APP開(kāi)發(fā)、響應(yīng)式網(wǎng)站開(kāi)發(fā)等網(wǎng)站項(xiàng)目制作,到程序開(kāi)發(fā),運(yùn)營(yíng)維護(hù)。創(chuàng)新互聯(lián)公司2013年至今到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專(zhuān)注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司。
本文會(huì)以電商業(yè)務(wù)為例,展示實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)處理流程。另外,本文旨在說(shuō)明實(shí)時(shí)數(shù)倉(cāng)的構(gòu)建流程,所以不會(huì)涉及太復(fù)雜的數(shù)據(jù)計(jì)算。為了保證案例的可操作性和完整性,本文會(huì)給出詳細(xì)的操作步驟。為了方便演示,本文的所有操作都是在Flink SQL Cli中完成的。
具體的架構(gòu)設(shè)計(jì)如圖所示:首先通過(guò)canal解析MySQL的binlog日志,將數(shù)據(jù)存儲(chǔ)在Kafka中。然后使用Flink SQL對(duì)原始數(shù)據(jù)進(jìn)行清洗關(guān)聯(lián),并將處理之后的明細(xì)寬表寫(xiě)入kafka中。維表數(shù)據(jù)存儲(chǔ)在MySQL中,通過(guò)Flink SQL對(duì)明細(xì)寬表與維表進(jìn)行JOIN,將聚合后的數(shù)據(jù)寫(xiě)入MySQL,最后通過(guò)FineBI進(jìn)行可視化展示。
CREATE TABLE `order_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號(hào)',
`consignee` varchar(100) DEFAULT NULL COMMENT '收貨人',
`consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人電話(huà)',
`total_amount` decimal(10,2) DEFAULT NULL COMMENT '總金額',
`order_status` varchar(20) DEFAULT NULL COMMENT '訂單狀態(tài)',
`user_id` bigint(20) DEFAULT NULL COMMENT '用戶(hù)id',
`payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
`delivery_address` varchar(1000) DEFAULT NULL COMMENT '送貨地址',
`order_comment` varchar(200) DEFAULT NULL COMMENT '訂單備注',
`out_trade_no` varchar(50) DEFAULT NULL COMMENT '訂單交易編號(hào)(第三方支付用)',
`trade_body` varchar(200) DEFAULT NULL COMMENT '訂單描述(第三方支付用)',
`create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時(shí)間',
`operate_time` datetime DEFAULT NULL COMMENT '操作時(shí)間',
`expire_time` datetime DEFAULT NULL COMMENT '失效時(shí)間',
`tracking_no` varchar(100) DEFAULT NULL COMMENT '物流單編號(hào)',
`parent_order_id` bigint(20) DEFAULT NULL COMMENT '父訂單編號(hào)',
`img_url` varchar(200) DEFAULT NULL COMMENT '圖片路徑',
`province_id` int(20) DEFAULT NULL COMMENT '地區(qū)',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單表';
CREATE TABLE `order_detail` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號(hào)',
`order_id` bigint(20) DEFAULT NULL COMMENT '訂單編號(hào)',
`sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱(chēng)(冗余)',
`img_url` varchar(200) DEFAULT NULL COMMENT '圖片名稱(chēng)(冗余)',
`order_price` decimal(10,2) DEFAULT NULL COMMENT '購(gòu)買(mǎi)價(jià)格(下單時(shí)sku價(jià)格)',
`sku_num` varchar(200) DEFAULT NULL COMMENT '購(gòu)買(mǎi)個(gè)數(shù)',
`create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時(shí)間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單詳情表';
CREATE TABLE `sku_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
`spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',
`price` decimal(10,0) DEFAULT NULL COMMENT '價(jià)格',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱(chēng)',
`sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品規(guī)格描述',
`weight` decimal(10,2) DEFAULT NULL COMMENT '重量',
`tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗余)',
`category3_id` bigint(20) DEFAULT NULL COMMENT '三級(jí)分類(lèi)id(冗余)',
`sku_default_img` varchar(200) DEFAULT NULL COMMENT '默認(rèn)顯示圖片(冗余)',
`create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時(shí)間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';
CREATE TABLE `base_category1` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號(hào)',
`name` varchar(10) NOT NULL COMMENT '分類(lèi)名稱(chēng)',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一級(jí)分類(lèi)表';
CREATE TABLE `base_category2` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號(hào)',
`name` varchar(200) NOT NULL COMMENT '二級(jí)分類(lèi)名稱(chēng)',
`category1_id` bigint(20) DEFAULT NULL COMMENT '一級(jí)分類(lèi)編號(hào)',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二級(jí)分類(lèi)表';
CREATE TABLE `base_category3` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號(hào)',
`name` varchar(200) NOT NULL COMMENT '三級(jí)分類(lèi)名稱(chēng)',
`category2_id` bigint(20) DEFAULT NULL COMMENT '二級(jí)分類(lèi)編號(hào)',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三級(jí)分類(lèi)表';
CREATE TABLE `base_province` (
`id` int(20) DEFAULT NULL COMMENT 'id',
`name` varchar(20) DEFAULT NULL COMMENT '省名稱(chēng)',
`region_id` int(20) DEFAULT NULL COMMENT '大區(qū)id',
`area_code` varchar(20) DEFAULT NULL COMMENT '行政區(qū)位碼'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `base_region` (
`id` int(20) NOT NULL COMMENT '大區(qū)id',
`region_name` varchar(20) DEFAULT NULL COMMENT '大區(qū)名稱(chēng)',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
關(guān)于ODS層的數(shù)據(jù)同步參見(jiàn)我的另一篇文章基于Canal與Flink實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)增量同步(一)。主要使用canal解析MySQL的binlog日志,然后將其寫(xiě)入到Kafka對(duì)應(yīng)的topic中。由于篇幅限制,不會(huì)對(duì)具體的細(xì)節(jié)進(jìn)行說(shuō)明。同步之后的結(jié)果如下圖所示:
本案例中將維表存儲(chǔ)在了MySQL中,實(shí)際生產(chǎn)中會(huì)用HBase存儲(chǔ)維表數(shù)據(jù)。我們主要用到兩張維表:區(qū)域維表和商品維表。處理過(guò)程如下:
首先將mydw.base_province
和mydw.base_region
這個(gè)主題對(duì)應(yīng)的數(shù)據(jù)抽取到MySQL中,主要使用Flink SQL的Kafka數(shù)據(jù)源對(duì)應(yīng)的canal-json格式,注意:在執(zhí)行裝載之前,需要先在MySQL中創(chuàng)建對(duì)應(yīng)的表,本文使用的MySQL數(shù)據(jù)庫(kù)的名字為dim,用于存放維表數(shù)據(jù)。如下:
-- -------------------------
-- 省份
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_province`;
CREATE TABLE `ods_base_province` (
`id` INT,
`name` STRING,
`region_id` INT ,
`area_code`STRING
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_province',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 省份
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_province`;
CREATE TABLE `base_province` (
`id` INT,
`name` STRING,
`region_id` INT ,
`area_code`STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_province', -- MySQL中的待插入數(shù)據(jù)的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 省份
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_province
SELECT *
FROM ods_base_province;
-- -------------------------
-- 區(qū)域
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_region`;
CREATE TABLE `ods_base_region` (
`id` INT,
`region_name` STRING
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_region',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 區(qū)域
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_region`;
CREATE TABLE `base_region` (
`id` INT,
`region_name` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_region', -- MySQL中的待插入數(shù)據(jù)的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 區(qū)域
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_region
SELECT *
FROM ods_base_region;
經(jīng)過(guò)上面的步驟,將創(chuàng)建維表所需要的原始數(shù)據(jù)已經(jīng)存儲(chǔ)到了MySQL中,接下來(lái)就需要在MySQL中創(chuàng)建維表,我們使用上面的兩張表,創(chuàng)建一張視圖:dim_province
作為維表:
-- ---------------------------------
-- DIM層,區(qū)域維表,
-- 在MySQL中創(chuàng)建視圖
-- ---------------------------------
DROP VIEW IF EXISTS dim_province;
CREATE VIEW dim_province AS
SELECT
bp.id AS province_id,
bp.name AS province_name,
br.id AS region_id,
br.region_name AS region_name,
bp.area_code AS area_code
FROM base_region br
JOIN base_province bp ON br.id= bp.region_id
;
這樣我們所需要的維表:dim_province就創(chuàng)建好了,只需要在維表join時(shí),使用Flink SQL創(chuàng)建JDBC的數(shù)據(jù)源,就可以使用該維表了。同理,我們使用相同的方法創(chuàng)建商品維表,具體如下:
-- -------------------------
-- 一級(jí)類(lèi)目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category1`;
CREATE TABLE `ods_base_category1` (
`id` BIGINT,
`name` STRING
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category1',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 一級(jí)類(lèi)目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category1`;
CREATE TABLE `base_category1` (
`id` BIGINT,
`name` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category1', -- MySQL中的待插入數(shù)據(jù)的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 一級(jí)類(lèi)目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category1
SELECT *
FROM ods_base_category1;
-- -------------------------
-- 二級(jí)類(lèi)目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category2`;
CREATE TABLE `ods_base_category2` (
`id` BIGINT,
`name` STRING,
`category1_id` BIGINT
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category2',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 二級(jí)類(lèi)目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category2`;
CREATE TABLE `base_category2` (
`id` BIGINT,
`name` STRING,
`category1_id` BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category2', -- MySQL中的待插入數(shù)據(jù)的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 二級(jí)類(lèi)目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category2
SELECT *
FROM ods_base_category2;
-- -------------------------
-- 三級(jí)類(lèi)目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category3`;
CREATE TABLE `ods_base_category3` (
`id` BIGINT,
`name` STRING,
`category2_id` BIGINT
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category3',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 三級(jí)類(lèi)目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category3`;
CREATE TABLE `base_category3` (
`id` BIGINT,
`name` STRING,
`category2_id` BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category3', -- MySQL中的待插入數(shù)據(jù)的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 三級(jí)類(lèi)目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category3
SELECT *
FROM ods_base_category3;
-- -------------------------
-- 商品表
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_sku_info`;
CREATE TABLE `ods_sku_info` (
`id` BIGINT,
`spu_id` BIGINT,
`price` DECIMAL(10,0),
`sku_name` STRING,
`sku_desc` STRING,
`weight` DECIMAL(10,2),
`tm_id` BIGINT,
`category3_id` BIGINT,
`sku_default_img` STRING,
`create_time` TIMESTAMP(0)
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.sku_info',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 商品表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `sku_info`;
CREATE TABLE `sku_info` (
`id` BIGINT,
`spu_id` BIGINT,
`price` DECIMAL(10,0),
`sku_name` STRING,
`sku_desc` STRING,
`weight` DECIMAL(10,2),
`tm_id` BIGINT,
`category3_id` BIGINT,
`sku_default_img` STRING,
`create_time` TIMESTAMP(0),
PRIMARY KEY (tm_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'sku_info', -- MySQL中的待插入數(shù)據(jù)的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 商品
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO sku_info
SELECT *
FROM ods_sku_info;
經(jīng)過(guò)上面的步驟,我們可以將創(chuàng)建商品維表的基礎(chǔ)數(shù)據(jù)表同步到MySQL中,同樣需要提前創(chuàng)建好對(duì)應(yīng)的數(shù)據(jù)表。接下來(lái)我們使用上面的基礎(chǔ)表在mySQL的dim庫(kù)中創(chuàng)建一張視圖:dim_sku_info
,用作后續(xù)使用的維表。
-- ---------------------------------
-- DIM層,商品維表,
-- 在MySQL中創(chuàng)建視圖
-- ---------------------------------
CREATE VIEW dim_sku_info AS
SELECT
si.id AS id,
si.sku_name AS sku_name,
si.category3_id AS c3_id,
si.weight AS weight,
si.tm_id AS tm_id,
si.price AS price,
si.spu_id AS spu_id,
c3.name AS c3_name,
c2.id AS c2_id,
c2.name AS c2_name,
c3.id AS c1_id,
c3.name AS c1_name
FROM
(
sku_info si
JOIN base_category3 c3 ON si.category3_id = c3.id
JOIN base_category2 c2 ON c3.category2_id =c2.id
JOIN base_category1 c1 ON c2.category1_id = c1.id
);
至此,我們所需要的維表數(shù)據(jù)已經(jīng)準(zhǔn)備好了,接下來(lái)開(kāi)始處理DWD層的數(shù)據(jù)。
經(jīng)過(guò)上面的步驟,我們已經(jīng)將所用的維表已經(jīng)準(zhǔn)備好了。接下來(lái)我們將對(duì)ODS的原始數(shù)據(jù)進(jìn)行處理,加工成DWD層的明細(xì)寬表。具體過(guò)程如下:
-- -------------------------
-- 訂單詳情
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_detail`;
CREATE TABLE `ods_order_detail`(
`id` BIGINT,
`order_id` BIGINT,
`sku_id` BIGINT,
`sku_name` STRING,
`img_url` STRING,
`order_price` DECIMAL(10,2),
`sku_num` INT,
`create_time` TIMESTAMP(0)
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.order_detail',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 訂單信息
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_info`;
CREATE TABLE `ods_order_info` (
`id` BIGINT,
`consignee` STRING,
`consignee_tel` STRING,
`total_amount` DECIMAL(10,2),
`order_status` STRING,
`user_id` BIGINT,
`payment_way` STRING,
`delivery_address` STRING,
`order_comment` STRING,
`out_trade_no` STRING,
`trade_body` STRING,
`create_time` TIMESTAMP(0) ,
`operate_time` TIMESTAMP(0) ,
`expire_time` TIMESTAMP(0) ,
`tracking_no` STRING,
`parent_order_id` BIGINT,
`img_url` STRING,
`province_id` INT
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.order_info',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- ---------------------------------
-- DWD層,支付訂單明細(xì)表dwd_paid_order_detail
-- ---------------------------------
DROP TABLE IF EXISTS dwd_paid_order_detail;
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,0),
create_time STRING,
pay_time STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DWD層,已支付訂單明細(xì)表
-- 向dwd_paid_order_detail裝載數(shù)據(jù)
-- ---------------------------------
INSERT INTO dwd_paid_order_detail
SELECT
od.id,
oi.id order_id,
oi.user_id,
oi.province_id,
od.sku_id,
od.sku_name,
od.sku_num,
od.order_price,
oi.create_time,
oi.operate_time
FROM
(
SELECT *
FROM ods_order_info
WHERE order_status = '2' -- 已支付
) oi JOIN
(
SELECT *
FROM ods_order_detail
) od
ON oi.id = od.order_id;
經(jīng)過(guò)上面的步驟,我們創(chuàng)建了一張dwd_paid_order_detail明細(xì)寬表,并將該表存儲(chǔ)在了Kafka中。接下來(lái)我們將使用這張明細(xì)寬表與維表進(jìn)行JOIN,得到我們ADS應(yīng)用層數(shù)據(jù)。
首先在MySQL中創(chuàng)建對(duì)應(yīng)的ADS目標(biāo)表:ads_province_index
CREATE TABLE ads.ads_province_index(
province_id INT(10),
area_code VARCHAR(100),
province_name VARCHAR(100),
region_id INT(10),
region_name VARCHAR(100),
order_amount DECIMAL(10,2),
order_count BIGINT(10),
dt VARCHAR(100),
PRIMARY KEY (province_id, dt)
) ;
向MySQL的ADS層目標(biāo)裝載數(shù)據(jù):
-- Flink SQL Cli操作
-- ---------------------------------
-- 使用 DDL創(chuàng)建MySQL中的ADS層表
-- 指標(biāo):1.每天每個(gè)省份的訂單數(shù)
-- 2.每天每個(gè)省份的訂單金額
-- ---------------------------------
CREATE TABLE ads_province_index(
province_id INT,
area_code STRING,
province_name STRING,
region_id INT,
region_name STRING,
order_amount DECIMAL(10,2),
order_count BIGINT,
dt STRING,
PRIMARY KEY (province_id, dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/ads',
'table-name' = 'ads_province_index',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail已支付訂單明細(xì)寬表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,2),
create_time STRING,
pay_time STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 訂單匯總臨時(shí)表
-- ---------------------------------
CREATE TABLE tmp_province_index(
province_id INT,
order_count BIGINT,-- 訂單數(shù)
order_amount DECIMAL(10,2), -- 訂單金額
pay_date DATE
)WITH (
'connector' = 'kafka',
'topic' = 'tmp_province_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 訂單匯總臨時(shí)表數(shù)據(jù)裝載
-- ---------------------------------
INSERT INTO tmp_province_index
SELECT
province_id,
count(distinct order_id) order_count,-- 訂單數(shù)
sum(order_price * sku_num) order_amount, -- 訂單金額
TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_province_index_source
-- 使用該臨時(shí)匯總表,作為數(shù)據(jù)源
-- ---------------------------------
CREATE TABLE tmp_province_index_source(
province_id INT,
order_count BIGINT,-- 訂單數(shù)
order_amount DECIMAL(10,2), -- 訂單金額
pay_date DATE,
proctime as PROCTIME() -- 通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
) WITH (
'connector' = 'kafka',
'topic' = 'tmp_province_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM層,區(qū)域維表,
-- 創(chuàng)建區(qū)域維表數(shù)據(jù)源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_province`;
CREATE TABLE dim_province (
province_id INT,
province_name STRING,
area_code STRING,
region_id INT,
region_name STRING ,
PRIMARY KEY (province_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'dim_province',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向ads_province_index裝載數(shù)據(jù)
-- 維表JOIN
-- ---------------------------------
INSERT INTO ads_province_index
SELECT
pc.province_id,
dp.area_code,
dp.province_name,
dp.region_id,
dp.region_name,
pc.order_amount,
pc.order_count,
cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_source pc
JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp
ON dp.province_id = pc.province_id;
當(dāng)提交任務(wù)之后:觀察Flink WEB UI:
查看ADS層的ads_province_index表數(shù)據(jù):
首先在MySQL中創(chuàng)建對(duì)應(yīng)的ADS目標(biāo)表:ads_sku_index
CREATE TABLE ads_sku_index
(
sku_id BIGINT(10),
sku_name VARCHAR(100),
weight DOUBLE,
tm_id BIGINT(10),
price DOUBLE,
spu_id BIGINT(10),
c3_id BIGINT(10),
c3_name VARCHAR(100) ,
c2_id BIGINT(10),
c2_name VARCHAR(100),
c1_id BIGINT(10),
c1_name VARCHAR(100),
order_amount DOUBLE,
order_count BIGINT(10),
sku_count BIGINT(10),
dt varchar(100),
PRIMARY KEY (sku_id,dt)
);
向MySQL的ADS層目標(biāo)裝載數(shù)據(jù):
-- ---------------------------------
-- 使用 DDL創(chuàng)建MySQL中的ADS層表
-- 指標(biāo):1.每天每個(gè)商品對(duì)應(yīng)的訂單個(gè)數(shù)
-- 2.每天每個(gè)商品對(duì)應(yīng)的訂單金額
-- 3.每天每個(gè)商品對(duì)應(yīng)的數(shù)量
-- ---------------------------------
CREATE TABLE ads_sku_index
(
sku_id BIGINT,
sku_name VARCHAR,
weight DOUBLE,
tm_id BIGINT,
price DOUBLE,
spu_id BIGINT,
c3_id BIGINT,
c3_name VARCHAR ,
c2_id BIGINT,
c2_name VARCHAR,
c1_id BIGINT,
c1_name VARCHAR,
order_amount DOUBLE,
order_count BIGINT,
sku_count BIGINT,
dt varchar,
PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/ads',
'table-name' = 'ads_sku_index',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail已支付訂單明細(xì)寬表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,2),
create_time STRING,
pay_time STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 商品指標(biāo)統(tǒng)計(jì)
-- ---------------------------------
CREATE TABLE tmp_sku_index(
sku_id BIGINT,
order_count BIGINT,-- 訂單數(shù)
order_amount DECIMAL(10,2), -- 訂單金額
order_sku_num BIGINT,
pay_date DATE
)WITH (
'connector' = 'kafka',
'topic' = 'tmp_sku_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 數(shù)據(jù)裝載
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
sku_id,
count(distinct order_id) order_count,-- 訂單數(shù)
sum(order_price * sku_num) order_amount, -- 訂單金額
sum(sku_num) order_sku_num,
TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_sku_index_source
-- 使用該臨時(shí)匯總表,作為數(shù)據(jù)源
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
sku_id BIGINT,
order_count BIGINT,-- 訂單數(shù)
order_amount DECIMAL(10,2), -- 訂單金額
order_sku_num BIGINT,
pay_date DATE,
proctime as PROCTIME() -- 通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
) WITH (
'connector' = 'kafka',
'topic' = 'tmp_sku_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM層,商品維表,
-- 創(chuàng)建商品維表數(shù)據(jù)源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
id BIGINT,
sku_name STRING,
c3_id BIGINT,
weight DECIMAL(10,2),
tm_id BIGINT,
price DECIMAL(10,2),
spu_id BIGINT,
c3_name STRING,
c2_id BIGINT,
c2_name STRING,
c1_id BIGINT,
c1_name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'dim_sku_info',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向ads_sku_index裝載數(shù)據(jù)
-- 維表JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
sku_id ,
sku_name ,
weight ,
tm_id ,
price ,
spu_id ,
c3_id ,
c3_name,
c2_id ,
c2_name ,
c1_id ,
c1_name ,
sc.order_amount,
sc.order_count ,
sc.order_sku_num ,
cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc
JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
ON ds.id = sc.sku_id
;
當(dāng)提交任務(wù)之后:觀察Flink WEB UI:
查看ADS層的ads_sku_index表數(shù)據(jù):
當(dāng)在代碼中使用Flink1.11.0版本時(shí),如果將一個(gè)change-log的數(shù)據(jù)源insert到一個(gè)upsert sink時(shí),會(huì)報(bào)如下異常:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])
該bug目前已被修復(fù),修復(fù)可以在Flink1.11.1中使用。
到此,相信大家對(duì)“基于Flink1.11的SQL構(gòu)建實(shí)時(shí)數(shù)倉(cāng)怎么實(shí)現(xiàn)”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢(xún),關(guān)注我們,繼續(xù)學(xué)習(xí)!
分享標(biāo)題:基于Flink1.11的SQL構(gòu)建實(shí)時(shí)數(shù)倉(cāng)怎么實(shí)現(xiàn)
文章轉(zhuǎn)載:http://muchs.cn/article22/ghgijc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、網(wǎng)站建設(shè)、網(wǎng)站策劃、云服務(wù)器、營(yíng)銷(xiāo)型網(wǎng)站建設(shè)、微信小程序
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)