如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

這篇文章主要介紹“如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)”,在日常操作中,相信很多人在如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

大同網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、自適應(yīng)網(wǎng)站建設(shè)等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營(yíng)維護(hù)。創(chuàng)新互聯(lián)成立于2013年到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)。

一.技術(shù)選型

相比于 Spark,目前 Spark 的生態(tài)總體更為完善一些,且在機(jī)器學(xué)習(xí)的集成和應(yīng)用性暫時(shí)領(lǐng)先。但作為下一代大數(shù)據(jù)引擎的有力競(jìng)爭(zhēng)者-Flink 在流式計(jì)算上有明顯優(yōu)勢(shì),F(xiàn)link 在流式計(jì)算里屬于真正意義上的單條處理,每一條數(shù)據(jù)都觸發(fā)計(jì)算,而不是像 Spark 一樣的 Mini Batch 作為流式處理的妥協(xié)。Flink 的容錯(cuò)機(jī)制較為輕量,對(duì)吞吐量影響較小,而且擁有圖和調(diào)度上的一些優(yōu)化,使得 Flink 可以達(dá)到很高的吞吐量。而 Strom 的容錯(cuò)機(jī)制需要對(duì)每條數(shù)據(jù)進(jìn)行 ack,因此其吞吐量瓶頸也是備受詬病。

這里引用一張圖來(lái)對(duì)常用的實(shí)時(shí)計(jì)算框架做個(gè)對(duì)比。

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)cdn.com/3da0ac542030556f0def525ccf6ec7ee9eec5b1f.jpeg">

Flink 特點(diǎn)

Flink 是一個(gè)開源的分布式實(shí)時(shí)計(jì)算框架。Flink 是有狀態(tài)的和容錯(cuò)的,可以在維護(hù)一次應(yīng)用程序狀態(tài)的同時(shí)無(wú)縫地從故障中恢復(fù);它支持大規(guī)模計(jì)算能力,能夠在數(shù)千個(gè)節(jié)點(diǎn)上并發(fā)運(yùn)行;它具有很好的吞吐量和延遲特性。同時(shí),F(xiàn)link 提供了多種靈活的窗口函數(shù)。

1)狀態(tài)管理機(jī)制

Flink 檢查點(diǎn)機(jī)制能保持 exactly-once 語(yǔ)義的計(jì)算。狀態(tài)保持意味著應(yīng)用能夠保存已經(jīng)處理的數(shù)據(jù)集結(jié)果和狀態(tài)。

2)事件機(jī)制

Flink 支持流處理和窗口事件時(shí)間語(yǔ)義。事件時(shí)間可以很容易地通過(guò)事件到達(dá)的順序和事件可能的到達(dá)延遲流中計(jì)算出準(zhǔn)確的結(jié)果。

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

3)窗口機(jī)制

Flink 支持基于時(shí)間、數(shù)目以及會(huì)話的非常靈活的窗口機(jī)制(window)。可以定制 window 的觸發(fā)條件來(lái)支持更加復(fù)雜的流模式。

4)容錯(cuò)機(jī)制

Flink 高效的容錯(cuò)機(jī)制允許系統(tǒng)在高吞吐量的情況下支持 exactly-once 語(yǔ)義的計(jì)算。Flink 可以準(zhǔn)確、快速地做到從故障中以零數(shù)據(jù)丟失的效果進(jìn)行恢復(fù)。

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

5)高吞吐、低延遲

Flink 具有高吞吐量和低延遲(能快速處理大量數(shù)據(jù))特性。下圖展示了 Apache Flink 和 Apache Storm 完成分布式項(xiàng)目計(jì)數(shù)任務(wù)的性能對(duì)比。

二.架構(gòu)演變

初期架構(gòu)

初期架構(gòu)僅為計(jì)算與存儲(chǔ)兩層,新來(lái)的計(jì)算需求接入后需要新開發(fā)一個(gè)實(shí)時(shí)計(jì)算任務(wù)進(jìn)行上線。重復(fù)模塊的代碼復(fù)用率低,重復(fù)率高,計(jì)算任務(wù)間的區(qū)別主要是集中在任務(wù)的計(jì)算指標(biāo)口徑上。

在存儲(chǔ)層,各個(gè)需求方所需求的存儲(chǔ)路徑都不相同,計(jì)算指標(biāo)可能在不通的存儲(chǔ)引擎上有重復(fù),有計(jì)算資源以及存儲(chǔ)資源上的浪費(fèi)情況。并且對(duì)于指標(biāo)的計(jì)算口徑也是僅局限于單個(gè)任務(wù)需求里的,不通需求任務(wù)對(duì)于相同的指標(biāo)的計(jì)算口徑?jīng)]有進(jìn)行統(tǒng)一的限制于保障。各個(gè)業(yè)務(wù)方也是在不同的存儲(chǔ)引擎上開發(fā)數(shù)據(jù)獲取服務(wù),對(duì)于那些專注于數(shù)據(jù)應(yīng)用本身的團(tuán)隊(duì)來(lái)說(shuō),無(wú)疑當(dāng)前模式存在一些弊端。

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

后期架構(gòu)

隨著數(shù)據(jù)體量的增加以及業(yè)務(wù)線的擴(kuò)展,前期架構(gòu)模式的弊端逐步開始顯現(xiàn)。從當(dāng)初單需求單任務(wù)的模式逐步轉(zhuǎn)變?yōu)橥ㄓ玫臄?shù)據(jù)架構(gòu)模式。為此,我們開發(fā)了一些基于 Flink 框架的通用組件來(lái)支持?jǐn)?shù)據(jù)的快速接入,并保證代碼模式的統(tǒng)一性和維護(hù)性。在數(shù)據(jù)層,我們基于 Clickhouse 來(lái)作為我們數(shù)據(jù)倉(cāng)庫(kù)的計(jì)算和存儲(chǔ)引擎,利用其支持多維 OLAP 計(jì)算的特性,來(lái)處理在多維多指標(biāo)大數(shù)據(jù)量下的快速查詢需求。在數(shù)據(jù)分層上,我們參考與借鑒離線數(shù)倉(cāng)的經(jīng)驗(yàn)與方法,構(gòu)建多層實(shí)時(shí)數(shù)倉(cāng)服務(wù),并開發(fā)多種微服務(wù)來(lái)為數(shù)倉(cāng)的數(shù)據(jù)聚合,指標(biāo)提取,數(shù)據(jù)出口,數(shù)據(jù)質(zhì)量,報(bào)警監(jiān)控等提供支持。

整體架構(gòu)分為五層:

1)接入層:接入原始數(shù)據(jù)進(jìn)行處理,如 Kafka、RabbitMQ、File 等。

2)計(jì)算層:選用 Flink 作為實(shí)時(shí)計(jì)算框架,對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行清洗,關(guān)聯(lián)等操作。

3)存儲(chǔ)層:對(duì)清洗完成的數(shù)據(jù)進(jìn)行數(shù)據(jù)存儲(chǔ),我們對(duì)此進(jìn)行了實(shí)時(shí)數(shù)倉(cāng)的模型分層與構(gòu)建,將不同應(yīng)用場(chǎng)景的數(shù)據(jù)分別存儲(chǔ)在如 Clickhouse,Hbase,redis,MySQL 等存儲(chǔ)。服務(wù)中,并抽象公共數(shù)據(jù)層與維度層數(shù)據(jù),分層處理壓縮數(shù)據(jù)并統(tǒng)一數(shù)據(jù)口徑。

4)服務(wù)層:對(duì)外提供統(tǒng)一的數(shù)據(jù)查詢服務(wù),支持從底層明細(xì)數(shù)據(jù)到聚合層數(shù)據(jù) 5min/10min/1hour 的多維計(jì)算服務(wù)。同時(shí)最上層特征指標(biāo)類數(shù)據(jù),如計(jì)算層輸入到Redis、Mysql 等也從此數(shù)據(jù)接口進(jìn)行獲取。

5)應(yīng)用層:以統(tǒng)一查詢服務(wù)為支撐對(duì)各個(gè)業(yè)務(wù)線數(shù)據(jù)場(chǎng)景進(jìn)行支撐。

  • 監(jiān)控報(bào)警:對(duì) Flink 任務(wù)的存活狀態(tài)進(jìn)行監(jiān)控,對(duì)異常的任務(wù)進(jìn)行郵件報(bào)警并根據(jù)設(shè)定的參數(shù)對(duì)任務(wù)進(jìn)行自動(dòng)拉起與恢復(fù)。根據(jù)如 Kafka 消費(fèi)的 offset 指標(biāo)對(duì)消費(fèi)處理延遲的實(shí)時(shí)任務(wù)進(jìn)行報(bào)警提醒。

  • 數(shù)據(jù)質(zhì)量:監(jiān)控實(shí)時(shí)數(shù)據(jù)指標(biāo),對(duì)歷史的實(shí)時(shí)數(shù)據(jù)與離線 hive 計(jì)算的數(shù)據(jù)定時(shí)做對(duì)比,提供實(shí)時(shí)數(shù)據(jù)的數(shù)據(jù)質(zhì)量指標(biāo),對(duì)超過(guò)閾值的指標(biāo)數(shù)據(jù)進(jìn)行報(bào)警。

三.數(shù)據(jù)處理流程

1.整體流程

整體數(shù)據(jù)從原始數(shù)據(jù)接入后經(jīng)過(guò) ETL 處理, 進(jìn)入實(shí)時(shí)數(shù)倉(cāng)底層數(shù)據(jù)表,經(jīng)過(guò)配置化聚合微服務(wù)組件向上進(jìn)行分層數(shù)據(jù)的聚合。根據(jù)不同業(yè)務(wù)的指標(biāo)需求也可通過(guò)特征抽取微服務(wù)直接配置化從數(shù)倉(cāng)中抽取到如 Redis、ES、Mysql 中進(jìn)行獲取。大部分的數(shù)據(jù)需求可通過(guò)統(tǒng)一數(shù)據(jù)服務(wù)接口進(jìn)行獲取。

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

2.問(wèn)題與挑戰(zhàn)

原始日志數(shù)據(jù)因?yàn)楦鳂I(yè)務(wù)日志的不同,所擁有的維度或指標(biāo)數(shù)據(jù)并不完整。所以需要進(jìn)行實(shí)時(shí)的日志的關(guān)聯(lián)才能獲取不同維度條件下的指標(biāo)數(shù)據(jù)查詢結(jié)果。并且關(guān)聯(lián)日志的回傳周期不同,有在 10min 之內(nèi)完成 95% 以上回傳的業(yè)務(wù)日志,也有類似于激活日志等依賴第三方回傳的有任務(wù)日志,延遲窗口可能大于1天。

并且最大日志關(guān)聯(lián)任務(wù)的日均數(shù)據(jù)量在 10 億級(jí)別以上,如何快速處理與構(gòu)建實(shí)時(shí)關(guān)聯(lián)任務(wù)的問(wèn)題首先擺在我們面前。對(duì)此我們基于 Flink 框架開發(fā)了配置化關(guān)聯(lián)組件。對(duì)于不同關(guān)聯(lián)日志的指標(biāo)抽取,我們也開發(fā)了配置化指標(biāo)抽取組件用于快速提取復(fù)雜的日志格式。以上兩個(gè)自研組件會(huì)在后面的內(nèi)容里再做詳細(xì)介紹。

1)回傳周期超過(guò)關(guān)聯(lián)窗口的日志如何處理?

對(duì)于回傳晚的日志,我們?cè)陉P(guān)聯(lián)窗口內(nèi)未取得關(guān)聯(lián)結(jié)果。我們采用實(shí)時(shí)+離線的方式進(jìn)行數(shù)據(jù)回刷補(bǔ)全。實(shí)時(shí)處理的日志我們會(huì)將未關(guān)聯(lián)的原始日志輸出到另外一個(gè)暫存地(Kafka),同時(shí)不斷消費(fèi)處理這個(gè)未關(guān)聯(lián)的日志集合,設(shè)定超時(shí)重關(guān)聯(lián)次數(shù)與超時(shí)重關(guān)聯(lián)時(shí)間,超過(guò)所設(shè)定任意閾值后,便再進(jìn)行重關(guān)聯(lián)。離線部分,我們采用 Hive 計(jì)算昨日全天日志與 N 天內(nèi)的全量被關(guān)聯(lián)日志表進(jìn)行關(guān)聯(lián),將最終的結(jié)果回寫進(jìn)去,替換實(shí)時(shí)所計(jì)算的昨日關(guān)聯(lián)數(shù)據(jù)。

2)如何提高 Flink 任務(wù)性能?

① Operator Chain

為了更高效地分布式執(zhí)行,F(xiàn)link 會(huì)盡可能地將 operator 的 subtask 鏈接(chain)在一起形成 task。每個(gè) task 在一個(gè)線程中執(zhí)行。將 operators 鏈接成 task 是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時(shí)提高整體的吞吐量。

Flink 會(huì)在生成 JobGraph 階段,將代碼中可以優(yōu)化的算子優(yōu)化成一個(gè)算子鏈(Operator Chains)以放到一個(gè) task(一個(gè)線程)中執(zhí)行,以減少線程之間的切換和緩沖的開銷,提高整體的吞吐量和延遲。下面以官網(wǎng)中的例子進(jìn)行說(shuō)明。

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

圖中棕色的長(zhǎng)條表示等待時(shí)間,可以發(fā)現(xiàn)網(wǎng)絡(luò)等待時(shí)間極大地阻礙了吞吐和延遲。為了解決同步訪問(wèn)的問(wèn)題,異步模式可以并發(fā)地處理多個(gè)請(qǐng)求和回復(fù)。也就是說(shuō),你可以連續(xù)地向數(shù)據(jù)庫(kù)發(fā)送用戶 a、b、c 等的請(qǐng)求,與此同時(shí),哪個(gè)請(qǐng)求的回復(fù)先返回了就處理哪個(gè)回復(fù),從而連續(xù)的請(qǐng)求之間不需要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實(shí)現(xiàn)原理。

③ Checkpoint 優(yōu)化

Flink 實(shí)現(xiàn)了一套強(qiáng)大的 checkpoint 機(jī)制,使它在獲取高吞吐量性能的同時(shí),也能保證 Exactly Once 級(jí)別的快速恢復(fù)。

首先提升各節(jié)點(diǎn) checkpoint 的性能考慮的就是存儲(chǔ)引擎的執(zhí)行效率。Flink 官方支持的三種 checkpoint state 存儲(chǔ)方案中,Memory 僅用于調(diào)試級(jí)別,無(wú)法做故障后的數(shù)據(jù)恢復(fù)。其次還有 Hdfs 與 Rocksdb,當(dāng)所做 Checkpoint 的數(shù)據(jù)大小較大時(shí),可以考慮采用 Rocksdb 來(lái)作為 checkpoint 的存儲(chǔ)以提升效率。

其次的思路是資源設(shè)置,我們都知道 checkpoint 機(jī)制是在每個(gè) task 上都會(huì)進(jìn)行,那么當(dāng)總的狀態(tài)數(shù)據(jù)大小不變的情況下,如何分配減少單個(gè) task 所分的 checkpoint 數(shù)據(jù)變成了提升 checkpoint 執(zhí)行效率的關(guān)鍵。

最后,增量快照. 非增量快照下,每次 checkpoint 都包含了作業(yè)所有狀態(tài)數(shù)據(jù)。而大部分場(chǎng)景下,前后 checkpoint 里,數(shù)據(jù)發(fā)生變更的部分相對(duì)很少,所以設(shè)置增量 checkpoint,僅會(huì)對(duì)上次 checkpoint 和本次 checkpoint 之間狀態(tài)的差異進(jìn)行存儲(chǔ)計(jì)算,減少了 checkpoint 的耗時(shí)。

3)如何保障任務(wù)的穩(wěn)定性?

在任務(wù)執(zhí)行過(guò)程中,會(huì)遇到各種各樣的問(wèn)題,導(dǎo)致任務(wù)異常甚至失敗。所以如何做好異常情況下的恢復(fù)工作顯得異常重要。

① 設(shè)定重啟策略

Flink 支持不同的重啟策略,以在故障發(fā)生時(shí)控制作業(yè)如何重啟。集群在啟動(dòng)時(shí)會(huì)伴隨一個(gè)默認(rèn)的重啟策略,在沒(méi)有定義具體重啟策略時(shí)會(huì)使用該默認(rèn)策略。如果在工作提交時(shí)指定了一個(gè)重啟策略,該策略會(huì)覆蓋集群的默認(rèn)策略。

默認(rèn)的重啟策略可以通過(guò) Flink 的配置文件 flink-conf.yaml 指定。配置參數(shù) restart-strategy 定義了哪個(gè)策略被使用。

常用的重啟策略:

  • 固定間隔(Fixed delay);

  • 失敗率(Failure rate);

  • 無(wú)重啟(No restart)。

② 設(shè)置 HA

Flink 在任務(wù)啟動(dòng)時(shí)指定 HA 配置主要是為了利用 Zookeeper 在所有運(yùn)行的 JobManager 實(shí)例之間進(jìn)行分布式協(xié)調(diào) .Zookeeper 通過(guò) leader 選取和輕量級(jí)一致性的狀態(tài)存儲(chǔ)來(lái)提供高可用的分布式協(xié)調(diào)服務(wù)。

③ 任務(wù)監(jiān)控報(bào)警平臺(tái)

在實(shí)際環(huán)境中,我們遇見過(guò)因?yàn)榧籂顟B(tài)不穩(wěn)定而導(dǎo)致的任務(wù)失敗。在 Flink 1.6 版本中,甚至遇見過(guò)任務(wù)出現(xiàn)假死的情況,也就是 Yarn 上的 job 資源依然存在,而 Flink 任務(wù)實(shí)際已經(jīng)死亡。為了監(jiān)測(cè)與恢復(fù)這些異常的任務(wù),并且對(duì)實(shí)時(shí)任務(wù)做統(tǒng)一的提交、報(bào)警監(jiān)控、任務(wù)恢復(fù)等管理,我們開發(fā)了任務(wù)提交與管理平臺(tái)。通過(guò) Shell 拉取 Yarn 上 Running 狀態(tài)與 Flink Job 狀態(tài)的列表進(jìn)行對(duì)比,心跳監(jiān)測(cè)平臺(tái)上的所有任務(wù),并進(jìn)行告警、自動(dòng)恢復(fù)等操作。

④ 作業(yè)指標(biāo)監(jiān)控

Flink 任務(wù)在運(yùn)行過(guò)程中,各 Operator 都會(huì)產(chǎn)生各自的指標(biāo)數(shù)據(jù),例如,Source 會(huì)產(chǎn)出 numRecordIn、numRecordsOut 等各項(xiàng)指標(biāo)信息,我們會(huì)將這些指標(biāo)信息進(jìn)行收集,并展示在我們的可視化平臺(tái)上。指標(biāo)平臺(tái)如下圖:

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

⑤ 任務(wù)運(yùn)行節(jié)點(diǎn)監(jiān)控

我們的 Flink 任務(wù)都是運(yùn)行在 Yarn 上,針對(duì)每一個(gè)運(yùn)行的作業(yè),我們需要監(jiān)控其運(yùn)行環(huán)境。會(huì)收集 JobManager 及 TaskManager 的各項(xiàng)指標(biāo)。收集的指標(biāo)有 jobmanager-fullgc-count、jobmanager-younggc-count、jobmanager-fullgc-time、jobmanager-younggc-time、taskmanager-fullgc-count、taskmanager-younggc-count、taskmanager-fullgc-time、taskmanager-younggc-time 等,用于判斷任務(wù)運(yùn)行環(huán)境的健康度,及用于排查可能出現(xiàn)的問(wèn)題。監(jiān)控界面如下:

四.數(shù)據(jù)關(guān)聯(lián)組件

1.如何選擇關(guān)聯(lián)方式?

1)Flink Table

從 Flink 的官方文檔,我們知道 Flink 的編程模型分為四層,sql 是最高層的 api, Table api 是中間層,DataSteam/DataSet Api 是核心,stateful Streaming process 層是底層實(shí)現(xiàn)。

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

剛開始我們直接使用 Flink Table 做為數(shù)據(jù)關(guān)聯(lián)的方式,直接將接入進(jìn)來(lái)的 DataStream 注冊(cè)為 Dynamic Table 后進(jìn)行兩表關(guān)聯(lián)查詢,如下圖:

但嘗試后發(fā)現(xiàn)在做那些日志數(shù)據(jù)量大的關(guān)聯(lián)查詢時(shí)往往只能在較小的時(shí)間窗口內(nèi)做查詢,否則會(huì)超過(guò) datanode 節(jié)點(diǎn)單臺(tái)內(nèi)存限制,產(chǎn)生異常。但為了滿足不同業(yè)務(wù)日志延遲到達(dá)的情況,這種實(shí)現(xiàn)方式并不通用。

2)Rocksdb

之后,我們直接在 DataStream 上進(jìn)行處理,在 CountWindow 窗口內(nèi)進(jìn)行關(guān)聯(lián)操作,將被關(guān)聯(lián)的數(shù)據(jù) Hash 打散后存儲(chǔ)在各個(gè) datanode 節(jié)點(diǎn)的 Rocksdb 中,利用 Flink State 原生支持 Rocksdb 做 Checkpoint 這一特性進(jìn)行算子內(nèi)數(shù)據(jù)的備份與恢復(fù)。這種方式是可行的,但受制于 Rocksdb 集群物理磁盤為非 SSD 的因素,這種方式在我們的實(shí)際線上場(chǎng)景中關(guān)聯(lián)耗時(shí)較高。

3)外部存儲(chǔ)關(guān)聯(lián)

如 Redis 類的 KV 存儲(chǔ)的確在查詢速度上提升不少,但類似廣告日志數(shù)據(jù)這樣單條日志大小較大的情況,會(huì)占用不少寶貴的機(jī)器內(nèi)存資源。經(jīng)過(guò)調(diào)研后,我們選取了 Hbase 作為我們?nèi)罩娟P(guān)聯(lián)組件的關(guān)聯(lián)數(shù)據(jù)存儲(chǔ)方案。

為了快速構(gòu)建關(guān)聯(lián)任務(wù),我們開發(fā)了基于 Flink 的配置化組件平臺(tái),提交配置文件即可生成數(shù)據(jù)關(guān)聯(lián)任務(wù)并自動(dòng)提交到集群。下圖是任務(wù)執(zhí)行的處理流程。

示意圖如下:

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

下圖是關(guān)聯(lián)組件內(nèi)的執(zhí)行流程圖:

2.問(wèn)題與優(yōu)化

1)加入 Interval Join

隨著日志量的增加,某些需要進(jìn)行關(guān)聯(lián)的日志數(shù)量可能達(dá)到日均十幾億甚至幾十億的量級(jí)。前期關(guān)聯(lián)組件的配置化生成任務(wù)的方式的確解決了大部分線上業(yè)務(wù)需求,但隨著進(jìn)一步的關(guān)聯(lián)需求增加,Hbase 面臨著巨大的查詢壓力。在我們對(duì) Hbase 表包括 rowkey 等一系列完成優(yōu)化之后,我們開始了對(duì)關(guān)聯(lián)組件的迭代與優(yōu)化。

第一步,減少 Hbase 的查詢。我們使用 Flink Interval Join 的方式,先將大部分關(guān)聯(lián)需求在程序內(nèi)部完成,只有少部分仍需查詢的日志會(huì)去查詢外部存儲(chǔ)(Hbase). 經(jīng)驗(yàn)證,以請(qǐng)求日志與實(shí)驗(yàn)日志關(guān)聯(lián)為例,對(duì)于設(shè)置 Interval Join 窗口在 10s 左右即可減少 80% 的 hbase 查詢請(qǐng)求。

① Interval Join 的語(yǔ)義示意圖

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

  • 數(shù)據(jù) JOIN 的區(qū)間 - 比如時(shí)間為 3 的 EXP 會(huì)在 IMP 時(shí)間為[2, 4]區(qū)間進(jìn)行JOIN;

  • WaterMark - 比如圖示 EXP 一條數(shù)據(jù)時(shí)間是 3,IMP 一條數(shù)據(jù)時(shí)間是 5,那么WaterMark是根據(jù)實(shí)際最小值減去 UpperBound 生成,即:Min(3,5)-1 = 2;

  • 過(guò)期數(shù)據(jù) - 出于性能和存儲(chǔ)的考慮,要將過(guò)期數(shù)據(jù)清除,如圖當(dāng) WaterMark 是 2 的時(shí)候時(shí)間為 2 以前的數(shù)據(jù)過(guò)期了,可以被清除。

② Interval Join 內(nèi)部實(shí)現(xiàn)邏輯

③ Interval Join 改造

因 Flink 原生的 Intervak Join 實(shí)現(xiàn)的是 Inner Join,而我們業(yè)務(wù)中所需要的是 Left Join,具體改造如下:

  • 取消右側(cè)數(shù)據(jù)流的 join 標(biāo)志位;

  • 左側(cè)數(shù)據(jù)流有 join 數(shù)據(jù)時(shí)不存 state。

2)關(guān)聯(lián)率動(dòng)態(tài)監(jiān)控

在任務(wù)執(zhí)行中,往往會(huì)出現(xiàn)意想不到的情況,比如被關(guān)聯(lián)的數(shù)據(jù)日志出現(xiàn)缺失,或者日志格式錯(cuò)誤引發(fā)的異常,造成關(guān)聯(lián)任務(wù)的關(guān)聯(lián)率下降嚴(yán)重。那么此時(shí)關(guān)聯(lián)任務(wù)雖然繼續(xù)在運(yùn)行,但對(duì)于整體數(shù)據(jù)質(zhì)量的意義不大,甚至是反向作用。在任務(wù)進(jìn)行恢復(fù)的時(shí),還需要清除異常區(qū)間內(nèi)的數(shù)據(jù),將 Kafka Offset 設(shè)置到異常前的位置再進(jìn)行處理。

故我們?cè)陉P(guān)聯(lián)組件的優(yōu)化中,加入了動(dòng)態(tài)監(jiān)控,下面示意圖:

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

  • 關(guān)聯(lián)任務(wù)中定時(shí)探測(cè)指定時(shí)間范圍 Hbase 是否有最新數(shù)據(jù)寫入,如果沒(méi)有,說(shuō)明寫 Hbase 任務(wù)出現(xiàn)問(wèn)題,則終止關(guān)聯(lián)任務(wù);

  • 當(dāng)寫 Hbase 任務(wù)出現(xiàn)堆積時(shí),相應(yīng)的會(huì)導(dǎo)致關(guān)聯(lián)率下降,當(dāng)關(guān)聯(lián)率低于指定閾值時(shí)終止關(guān)聯(lián)任務(wù);

  • 當(dāng)關(guān)聯(lián)任務(wù)終止時(shí)會(huì)發(fā)出告警,修復(fù)上游任務(wù)后可重新恢復(fù)關(guān)聯(lián)任務(wù),保證關(guān)聯(lián)數(shù)據(jù)不丟失。

五.數(shù)據(jù)清洗組件

為了快速進(jìn)行日志數(shù)據(jù)的指標(biāo)抽取,我們開發(fā)了基于 Flink 計(jì)算平臺(tái)的指標(biāo)抽取組件Logwash。封裝了基于 Freemaker 的模板引擎做為日志格式的解析模塊,對(duì)日志進(jìn)行提取,算術(shù)運(yùn)算,條件判斷,替換,循環(huán)遍歷等操作。

下圖是 Logwash 組件的處理流程:

組件支持文本與 Json 兩種類型日志進(jìn)行解析提取,目前該清洗組件已支持微博廣告近百個(gè)實(shí)時(shí)清洗需求,提供給運(yùn)維組等第三方非實(shí)時(shí)計(jì)算方向人員快速進(jìn)行提取日志的能力。

配置文件部分示例:

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

六.FlinkStream 組件庫(kù)

Flink 中 DataStream 的開發(fā),對(duì)于通用的邏輯及相同的代碼進(jìn)行了抽取,生成了我們的通用組件庫(kù) FlinkStream。FlinkStream 包括了對(duì) Topology 的抽象及默認(rèn)實(shí)現(xiàn)、對(duì) Stream 的抽象及默認(rèn)實(shí)現(xiàn)、對(duì) Source 的抽象和某些實(shí)現(xiàn)、對(duì) Operator 的抽象及某些實(shí)現(xiàn)、Sink 的抽象及某些實(shí)現(xiàn)。任務(wù)提交統(tǒng)一使用可執(zhí)行 Jar 和配置文件,Jar 會(huì)讀取配置文件構(gòu)建對(duì)應(yīng)的拓?fù)鋱D。

1.Source 抽象

對(duì)于 Source 進(jìn)行抽象,創(chuàng)建抽象類及對(duì)應(yīng)接口,對(duì)于 Flink Connector 中已有的實(shí)現(xiàn),例如 kafka,Elasticsearch 等,直接創(chuàng)建新 class 并繼承接口,實(shí)現(xiàn)對(duì)應(yīng)的方法即可。對(duì)于需要自己去實(shí)現(xiàn)的 connector,直接繼承抽象類及對(duì)應(yīng)接口,實(shí)現(xiàn)方法即可。目前只實(shí)現(xiàn)了 KafkaSource。

2.Operator 抽象

與 Source 抽象類似,我們實(shí)現(xiàn)了基于 Stream 到 Stream 級(jí)別的 Operator 抽象。創(chuàng)建抽象 Operate 類,抽象 Transform 方法。對(duì)于要實(shí)現(xiàn)的 Transform 操作,直接繼承抽象類,實(shí)現(xiàn)其抽象方法即可。目前實(shí)現(xiàn)的 Operator,直接按照文檔使用。如下:

25

3.Sink 抽象

針對(duì) Sink,我們同樣創(chuàng)建了抽象類及接口。對(duì) Flink Connector 中已有的 Sink 進(jìn)行封裝。目前可通過(guò)配置進(jìn)行數(shù)據(jù)輸出的 Sink。目前以實(shí)現(xiàn)和封裝的 Sink 組件有:Kafka、Stdout、Elasticsearch、Clickhouse、Hbase、Redis、MySQL。

4.Stream 抽象

創(chuàng)建 Stream 抽象類及抽象方法 buildStream,用于構(gòu)建 StreamGraph。我們實(shí)現(xiàn)了默認(rèn)的 Stream,buildStream 方法讀取 Source 配置生成 DataStream,通過(guò) Operator 配置列表按順序生成拓?fù)鋱D,通過(guò) Sink 配置生成數(shù)據(jù)寫出組件。

5.Topology 抽象

對(duì)于單 Stream,要處理的邏輯可能比較簡(jiǎn)單,主要讀取一個(gè) Source 進(jìn)行數(shù)據(jù)的各種操作并輸出。對(duì)于復(fù)雜的多 Stream 業(yè)務(wù)需求,比如多流 Join,多流 Union、Split 流等,因此我們多流業(yè)務(wù)進(jìn)行了抽象,產(chǎn)生了 Topology。在 Topology 這一層可以對(duì)多流進(jìn)行配置化操作。對(duì)于通用的操作,我們實(shí)現(xiàn)了默認(rèn) Topology,直接通過(guò)配置文件就可以實(shí)現(xiàn)業(yè)務(wù)需求。對(duì)于比較復(fù)雜的業(yè)務(wù)場(chǎng)景,用戶可以自己實(shí)現(xiàn) Topology。

6.配置化

我們對(duì)抽象的組件都是可配置化的,直接通過(guò)編寫配置文件,構(gòu)造任務(wù)的運(yùn)行拓?fù)浣Y(jié)構(gòu),啟動(dòng)任務(wù)時(shí)指定配置文件。

  • 正文文本框 Flink Environment 配置化,包括時(shí)間處理類型、重啟策略,checkpoint 等;

  • Topology 配置化,可配置不同 Stream 之間的處理邏輯與 Sink;

  • Stream 配置化,可配置 Source,Operator 列表,Sink。

配置示例如下:

run_env:

  timeCharacteristic: "ProcessingTime" #ProcessingTime\IngestionTime\EventTime
  restart: # 重啟策略配置
    type: # noRestart, fixedDelayRestart, fallBackRestart, failureRateRestart
  checkpoint: # 開啟checkpoint
    type: "rocksdb" # 


streams:
  impStream:  #粉絲經(jīng)濟(jì)曝光日志
    type: "DefaultStream"
    config:
      source:
        type: "Kafka011" # 源是kafka011版本
        config:
        parallelism: 20
      operates:
        -
          type: "StringToMap"
          config:
        -
          type: "SplitElement"
          config:
        ...
        -
          type: "SelectElement"
          config:


transforms:
  -
    type: "KeyBy"
    config:
  -
    type: "CountWindowWithTimeOut"  #Window需要和KeyBy組合使用
    config:
  -
    type: "SplitStream"
    config:
  -
    type: "SelectStream"
    config:
sink:
  -
    type: Kafka
    config:
  -
    type: Kafka
    config:

7.部署

在實(shí)時(shí)任務(wù)管理平臺(tái),新建任務(wù),填寫任務(wù)名稱,選擇任務(wù)類型(Flink)及版本,上傳可執(zhí)行 Jar 文件,導(dǎo)入配置或者手動(dòng)編寫配置,填寫 JobManager 及 TaskManager 內(nèi)存配置,填寫并行度配置,選擇是否重試,選擇是否從 checkpoint 恢復(fù)等選項(xiàng),保存后即可在任務(wù)列表中啟動(dòng)任務(wù),并觀察啟動(dòng)日志用于排查啟動(dòng)錯(cuò)誤。

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

七.FlinkSQL 擴(kuò)展

SQL 語(yǔ)言是一門聲明式的,簡(jiǎn)單的,靈活的語(yǔ)言,F(xiàn)link 本身提供了對(duì) SQL 的支持。Flink 1.6 版本和 1.8 版本對(duì) SQL 語(yǔ)言的支持有限,不支持建表語(yǔ)句,不支持對(duì)外部數(shù)據(jù)的關(guān)聯(lián)操作。因此我們通過(guò) Apache Calcite 對(duì) Flink SQL API 進(jìn)行了擴(kuò)展,用戶只需要關(guān)心業(yè)務(wù)需求怎么用 SQL 語(yǔ)言來(lái)表達(dá)即可。

1.支持創(chuàng)建源表

擴(kuò)展了支持創(chuàng)建源表 SQL,通過(guò)解析 SQL 語(yǔ)句,獲取數(shù)據(jù)源配置信息,創(chuàng)建對(duì)應(yīng)的 TableSource 實(shí)例,并將其注冊(cè)到 Flink environment。示例如下:

2.支持創(chuàng)建維表

使用 Apache Calcite 對(duì) SQL 進(jìn)行解析,通過(guò)維表關(guān)鍵字識(shí)別維表,使用 RichAsyncFunction 算子異步讀取維表數(shù)據(jù),并通過(guò) flatMap 操作生成關(guān)聯(lián)后的 DataStream,然后轉(zhuǎn)換為 Table 注冊(cè)到 Flink Environment。示例如下:

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

3.支持創(chuàng)建視圖

使用 SQLQuery 方法,支持從上一層表或者視圖中創(chuàng)建視圖表,并將新的視圖表注冊(cè)到 Flink Environment。創(chuàng)建語(yǔ)句需要按照順序?qū)?,比?myView2 是從視圖 myView1 中創(chuàng)建的,則 myView1 創(chuàng)建語(yǔ)句要在myView2語(yǔ)句前面。如下:

4.支持創(chuàng)建結(jié)果表

支持創(chuàng)建結(jié)果表,通過(guò)解析 SQL 語(yǔ)句,獲取配置信息,創(chuàng)建對(duì)應(yīng)的 AppendStreamTableSink 或者 UpsertStreamTableSink 實(shí)例,并將其注冊(cè)到 Flink Environment。示例如下:

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

5.支持自定義UDF

支持自定義 UDF 函數(shù),繼承 ScalarFunction 或者 TableFunction。在 resources 目錄下有相應(yīng)的 UDF 資源配置文件,默認(rèn)會(huì)注冊(cè)全部可執(zhí)行 Jar 包中配置的 UDF。直接按照使用方法使用即可。

6.部署

部署方式同 Flink Stream 組件。

八.實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)的構(gòu)建

為了保證實(shí)時(shí)數(shù)據(jù)的統(tǒng)一對(duì)外出口以及保證數(shù)據(jù)指標(biāo)的統(tǒng)一口徑,我們根據(jù)業(yè)界離線數(shù)倉(cāng)的經(jīng)驗(yàn)來(lái)設(shè)計(jì)與構(gòu)架微博廣告實(shí)時(shí)數(shù)倉(cāng)。

1.分層概覽

數(shù)據(jù)倉(cāng)庫(kù)分為三層,自下而上為:數(shù)據(jù)引入層(ODS,Operation Data Store)、數(shù)據(jù)公共層(CDM,Common Data Model)和數(shù)據(jù)應(yīng)用層(ADS,Application Data Service)。

  • 數(shù)據(jù)引入層(ODS,Operation Data Store):將原始數(shù)據(jù)幾乎無(wú)處理的存放在數(shù)據(jù)倉(cāng)庫(kù)系統(tǒng),結(jié)構(gòu)上與源系統(tǒng)基本保持一致,是數(shù)據(jù)倉(cāng)庫(kù)的數(shù)據(jù)準(zhǔn)。

  • 數(shù)據(jù)公共層(CDM,Common Data Model,又稱通用數(shù)據(jù)模型層):包含 DIM 維度表、DWD 和 DWS,由 ODS 層數(shù)據(jù)加工而成。主要完成數(shù)據(jù)加工與整合,建立一致性的維度,構(gòu)建可復(fù)用的面向分析和統(tǒng)計(jì)的明細(xì)事實(shí)表,以及匯總公共粒度的指標(biāo)。

公共維度層(DIM):基于維度建模理念思想,建立整個(gè)企業(yè)的一致性維度。降低數(shù)據(jù)計(jì)算口徑和算法不統(tǒng)一風(fēng)險(xiǎn)。

公共維度層的表通常也被稱為邏輯維度表,維度和維度邏輯表通常一一對(duì)應(yīng)。

公共匯總粒度事實(shí)層(DWS,Data Warehouse Service):以分析的主題對(duì)象作為建模驅(qū)動(dòng),基于上層的應(yīng)用和產(chǎn)品的指標(biāo)需求,構(gòu)建公共粒度的匯總指標(biāo)事實(shí)表,以寬表化手段物理化模型。構(gòu)建命名規(guī)范、口徑一致的統(tǒng)計(jì)指標(biāo),為上層提供公共指標(biāo),建立匯總寬表、明細(xì)事實(shí)表。

公共匯總粒度事實(shí)層的表通常也被稱為匯總邏輯表,用于存放派生指標(biāo)數(shù)據(jù)。

明細(xì)粒度事實(shí)層(DWD,Data Warehouse Detail):以業(yè)務(wù)過(guò)程作為建模驅(qū)動(dòng),基于每個(gè)具體的業(yè)務(wù)過(guò)程特點(diǎn),構(gòu)建最細(xì)粒度的明細(xì)層事實(shí)表??梢越Y(jié)合企業(yè)的數(shù)據(jù)使用特點(diǎn),將明細(xì)事實(shí)表的某些重要維度屬性字段做適當(dāng)冗余,也即寬表化處理。

明細(xì)粒度事實(shí)層的表通常也被稱為邏輯事實(shí)表。

  • 數(shù)據(jù)應(yīng)用層(ADS,Application Data Service):存放數(shù)據(jù)產(chǎn)品個(gè)性化的統(tǒng)計(jì)指標(biāo)數(shù)據(jù)。根據(jù) CDM 與 ODS 層加工生成。

2.詳細(xì)分層模型

如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)

對(duì)于原始日志數(shù)據(jù),ODS 層幾乎是每條日志抽取字段后進(jìn)行保留,這樣便能對(duì)問(wèn)題的回溯與追蹤。在 CDM 層對(duì) ODS 的數(shù)據(jù)僅做時(shí)間粒度上的數(shù)據(jù)壓縮,也就是在指定時(shí)間切分窗口里,對(duì)所有維度下的指標(biāo)做聚合操作,而不涉及業(yè)務(wù)性的操作。在 ADS 層,我們會(huì)有配置化抽取微服務(wù),對(duì)底層數(shù)據(jù)做定制化計(jì)算和提取,輸出到用戶指定的存儲(chǔ)服務(wù)里。

到此,關(guān)于“如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

新聞名稱:如何理解微博基于Flink的實(shí)時(shí)計(jì)算平臺(tái)建設(shè)
路徑分享:http://muchs.cn/article24/jchije.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供、手機(jī)網(wǎng)站建設(shè)小程序開發(fā)、域名注冊(cè)、關(guān)鍵詞優(yōu)化網(wǎng)站維護(hù)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)