apachespark中怎么實現(xiàn)端對端的exactlyonce

這篇文章主要介紹了apache spark中怎么實現(xiàn)端對端的 exactly once,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

創(chuàng)新互聯(lián)建站專注于婁煩網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供婁煩營銷型網(wǎng)站建設(shè),婁煩網(wǎng)站制作、婁煩網(wǎng)頁設(shè)計、婁煩網(wǎng)站官網(wǎng)定制、小程序定制開發(fā)服務(wù),打造婁煩網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供婁煩網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。

Exactly Once的實現(xiàn)

如果實時作業(yè)要實現(xiàn)端對端的 exactly once 則需要數(shù)據(jù)源、數(shù)據(jù)處理與數(shù)據(jù)存儲的三個階段都保證 exactly once 的語義。目前基 于Kafka Direct API加上Spark RDD 算子精確一次的保證能夠?qū)崿F(xiàn)端對端的 exactly once 的語義。

在數(shù)據(jù)存儲階段一般實現(xiàn) exactly once 需要保證存儲的過程是冪等操作或事務(wù)操作。很多系統(tǒng)本身就支持了冪等操作,比如相同數(shù)據(jù)寫 hdfs 同一個文件,這本身就是冪等操作,保證了多次操作最終獲取的值還是相同;HBase、ElasticSearch 與 redis 等都能夠?qū)崿F(xiàn)冪等操作。對于關(guān)系型數(shù)據(jù)庫的操作一般都是能夠支持事務(wù)性操作。

官方在創(chuàng)建 DirectKafkaInputStream 時只需要輸入消費 Kafka 的 From Offset,然后其自行獲取本次消費的 End Offset,也就是當(dāng)前最新的 Offset。保存的 Offset 是本批次的 End Offset,下次消費從上次的 End Offset 開始消費。

當(dāng)程序宕機(jī)或重啟任務(wù)后,這其中存在一些問題。如果在數(shù)據(jù)處理完成前存儲 Offset,則可能存在作業(yè)處理數(shù)據(jù)失敗與作業(yè)宕機(jī)等情況,重啟后會無法追溯上次處理的數(shù)據(jù)導(dǎo)致數(shù)據(jù)出現(xiàn)丟失。如果在數(shù)據(jù)處理完成后存儲 Offset,但是存儲 Offset 過程中發(fā)生失敗或作業(yè)宕機(jī)等情況,則在重啟后會重復(fù)消費上次已經(jīng)消費過的數(shù)據(jù)。

而且此時又無法保證重啟后消費的數(shù)據(jù)與宕機(jī)前的數(shù)據(jù)量相同數(shù)據(jù)相當(dāng),這又會引入另外一個問題,如果是基于聚合統(tǒng)計指標(biāo)作更新操作,這會帶來無法判斷上次數(shù)據(jù)是否已經(jīng)更新成功。

所以在 muise spark core 中我們加入了自己的實現(xiàn)用以保證 Exactly once 的語義。具體的實現(xiàn)是我們對 Spark 源碼進(jìn)行了改造,保證在創(chuàng)建 DirectKafkaInputStream 可以同時輸入 From Offset 與 End Offset,并且我們在存儲 Kafka Offset 的時候保存了每個批次的起始Offset 與結(jié)束 Offset,具體格式如下:

apache spark中怎么實現(xiàn)端對端的 exactly once

如此做的用意在于能夠確保無論是宕機(jī)還是人為重啟,重啟后的第一個批次與重啟前的最后一個批次數(shù)據(jù)一模一樣。這樣的設(shè)計使得后面用戶在后面對于第一個批次的數(shù)據(jù)處理非常靈活可變,如果用戶直接忽略第一個批次的數(shù)據(jù),那此時保證的是 at most once 的語義,因為我們無法獲知重啟前的最后一個批次數(shù)據(jù)操作是否有成功完成。

如果用戶依照原有邏輯處理第一個批次的數(shù)據(jù),不對其做去重操作,那此時保證的是 at least once 的語義,最終結(jié)果中可能存在重復(fù)數(shù)據(jù);最后如果用戶想要實現(xiàn) exactly once,muise spark core 提供了根據(jù)topic、partition 與 offset 生成 UID 的功能。

只要確保兩個批次消費的 Offset 相同,則最終生成的 UID 也相同,用戶可以根據(jù)此 UID 作為判斷上個批次數(shù)據(jù)是否有存儲成功的依據(jù)。下面簡單的給出了重啟后第一個批次操作的行為。

apache spark中怎么實現(xiàn)端對端的 exactly once

002

Metrics系統(tǒng)

Musie spark core 基于 Spark 本身的 metrics 系統(tǒng)進(jìn)行了改造,添加了許多定制的 metrics,并且向用戶暴露了 metrics 注冊接口,用戶可以非常方便的注冊自己的 metrics 并在程序中更新 metrics 的數(shù)值。最后所有的 metrics 會根據(jù)作業(yè)設(shè)定的批次間隔寫入 Graphite,基于公司定制的預(yù)警系統(tǒng)進(jìn)行報警,前端可以通過 Grafana 展現(xiàn)各項 metrics 指標(biāo)。

Muise spark core本身定制的metrics包含以下三種:

  • Fail 批次時間內(nèi) spark task 失敗次數(shù)超過4次便報警,用于監(jiān)控程序的運行狀態(tài)。

  • Ack 批次時間內(nèi) spark streaming 處理的數(shù)據(jù)量小0便報警,用于監(jiān)控程序是否在正常消費數(shù)據(jù)。

  • Lag 批次時間內(nèi)數(shù)據(jù)消費延遲大于設(shè)定值便報警。

其中由于我們大部分作業(yè)開啟了 Back Pressure 功能,這就導(dǎo)致在Spark UI 中看到每個批次數(shù)據(jù)都能在正常時間內(nèi)消費完成,然而可能此時 kafka 中已經(jīng)積壓了大量數(shù)據(jù),故每個批次我們都會計算當(dāng)前消費時間與數(shù)據(jù)本身時間的一個平均差值,如果這個差值大于批次時間,說明本身數(shù)據(jù)消費就已經(jīng)存在了延遲。

下圖展現(xiàn)了預(yù)警系統(tǒng)中,基于用戶自定義注冊的Metrics以及系統(tǒng)定制的Metrics進(jìn)行預(yù)警。

apache spark中怎么實現(xiàn)端對端的 exactly once

003

容錯

其實在上面 Exactly Once 一章中已經(jīng)詳細(xì)的描述了 muise spark core如何在程序宕機(jī)后能夠保證數(shù)據(jù)正確的處理。但是為了能夠讓 Spark Sreaming 能夠長時間穩(wěn)定的運行在Yarn集群上,還需要添加許多配置,感興趣的朋友可以查看:Long running Spark Streaming Jobs on Yarn Cluster。

除了上述容錯保證之外,Muise Portal(后面會講)也提供了對 Spark Streaming 作業(yè)定時檢測的功能。目前每過5分鐘對當(dāng)前所有數(shù)據(jù)庫中狀態(tài)標(biāo)記為 Running 的 Spark Streaming 作業(yè)進(jìn)行狀態(tài)檢測,通過Yarn提供的REST APIs可以根據(jù)每個作業(yè)的Application Id查詢作業(yè)在Yarn上的狀態(tài),如果狀態(tài)處于非運行狀態(tài),則會嘗試重啟作業(yè)。

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“apache spark中怎么實現(xiàn)端對端的 exactly once”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

分享名稱:apachespark中怎么實現(xiàn)端對端的exactlyonce
分享地址:http://muchs.cn/article24/jcjdce.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、定制網(wǎng)站做網(wǎng)站、建站公司網(wǎng)站設(shè)計公司、

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)