FlinkSQL怎么搭建

本篇內(nèi)容主要講解“FlinkSQL怎么搭建”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“FlinkSQL怎么搭建”吧!

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對(duì)這個(gè)行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長期合作伙伴,公司提供的服務(wù)項(xiàng)目有:域名注冊(cè)、雅安服務(wù)器托管、營銷軟件、網(wǎng)站建設(shè)、玉溪網(wǎng)站維護(hù)、網(wǎng)站推廣。

1.背景

由于公司內(nèi)部需求較多,并不想每次都寫一個(gè) streaming 程序,故而開始搭建 flinksql 平臺(tái),基于 jdk1.8,flink1.12.x

2.效果

傳一個(gè) sql 文件給 jar 包,然后 sql 文件內(nèi)的 sql 將自動(dòng)執(zhí)行

3. jar 包 vs web 界面

調(diào)研了基于 web 的 zeppline

  1. zeppline 設(shè)計(jì)的初衷其實(shí)是為了交互式分析

  2. 基于 zeppline rest api 與現(xiàn)有的監(jiān)控不兼容,需要修改現(xiàn)有監(jiān)控的代碼

  3. 雖然帶有 web 界面的對(duì)用戶很是友好,對(duì)于分析人員來說,是一個(gè)不錯(cuò)的選擇,但對(duì)于開發(fā)人員來說,真正的線上長時(shí)間的運(yùn)行程序,開發(fā)成 HA 的 server 還是有必要的

基于以上 3 點(diǎn)最終選擇 jar 作為最終的方式

4. 使用

  1. 將 sql 寫入 xxx.sql 文件中,如

CREATE TEMPORARY FUNCTION MillisecondsToDateStr AS 'io.github.shengjk.udf.MillisecondsToDateStr' LANGUAGE JAVA;-- ExecutionCheckpointingOptionsset execution.checkpointing.mode=EXACTLY_ONCE;set execution.checkpointing.timeout=30 min;--  30minset execution.checkpointing.interval=1 min ; -- 1minset execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;-- ExecutionConfigOptionsset table.exec.state.ttl=1 day;  -- 1 dayset table.exec.mini-batch.enabled=true; -- enable mini-batch optimizationset table.exec.mini-batch.allow-latency=1 s; -- 1sset table.exec.mini-batch.size=1000;set table.exec.sink.not-null-enforcer=drop;-- -- dadadadadadaCREATE TABLE orders(
   status      int,
   courier_id  bigint,
   id          bigint,
   finish_time BIGINT)WITH (
   'connector' = 'kafka','topic' = 'canal_monitor_order',
   'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup',
   'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'earliest-offset');-- flink.partition-discovery.interval-millis;CREATE TABLE infos(
   info_index int,
   order_id   bigint)WITH (
   'connector' = 'kafka','topic' = 'canal_monitor_order',
   'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup',
   'format' = 'ss-canal-json','ss-canal-json.table.include' = 'infos','scan.startup.mode' = 'earliest-offset');CREATE TABLE redisCache(
   finishOrders BIGINT,
   courier_id   BIGINT,
   dayStr       String)WITH (
   'connector' = 'redis',
   'hostPort'='localhost:6400',
   'keyType'='hash',
   'keyTemplate'='test2_${courier_id}',
   'fieldTemplate'='${dayStr}',
   'valueNames'='finishOrders',
   'expireTime'='259200');create view temp asselect o.courier_id,  (CASE   WHEN sum(infosMaxIndex.info_index) is null then 0   else sum(infosMaxIndex.info_index) end) finishOrders,  o.status,  dayStrfrom ((select courier_id, id, last_value(status)                             status, MillisecondsToDateStr(finish_time, 'yyyyMMdd') dayStr      from orders      where status = 60  group by courier_id, id, MillisecondsToDateStr(finish_time, 'yyyyMMdd'))) oleft join (select max(info_index) info_index, order_id                   from infos                   group by order_id) infosMaxIndex on o.id = infosMaxIndex.order_idgroup by o.courier_id, o.status, dayStr;INSERT INTO redisCache SELECT finishOrders,courier_id,dayStr FROM temp;
  1. 將 flinksql-platform 打包并上傳至服務(wù)器

  2. 將必要的 connector jar 放入到相應(yīng)的目錄下

  3. 執(zhí)行,如

flink-1.12.0/bin/flink  run -p 3 -yt ./flinkjar/  -C file:///home/shengjk/flinkjar/test-udf.jar -C file:///home/shengjk/flinkjar/jedis-2.10.2.jar  -m yarn-cluster -ynm sqlDemo  -c io.github.shengjk.Main ./flinksql-platform-1.0-SNAPSHOT.jar --sqlPath ./xxx.sql

其中
-C 添加 udfJar 等第三方 jar 包 -C 參數(shù)apply到了client端生成的JobGraph里,然后提交JobGraph來運(yùn)行的
-yt 目錄 將 udfJar 等第三方 jar 包提交到 TaskManager 上

到此,相信大家對(duì)“FlinkSQL怎么搭建”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

標(biāo)題名稱:FlinkSQL怎么搭建
分享網(wǎng)址:http://muchs.cn/article24/pphhje.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、靜態(tài)網(wǎng)站、網(wǎng)站建設(shè)網(wǎng)站設(shè)計(jì)公司、自適應(yīng)網(wǎng)站、

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作