這篇文章主要介紹了apache spark中怎么實現(xiàn)端對端的 exactly once,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
創(chuàng)新互聯(lián)建站專注于婁煩網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供婁煩營銷型網(wǎng)站建設(shè),婁煩網(wǎng)站制作、婁煩網(wǎng)頁設(shè)計、婁煩網(wǎng)站官網(wǎng)定制、小程序定制開發(fā)服務(wù),打造婁煩網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供婁煩網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。
Exactly Once的實現(xiàn)
如果實時作業(yè)要實現(xiàn)端對端的 exactly once 則需要數(shù)據(jù)源、數(shù)據(jù)處理與數(shù)據(jù)存儲的三個階段都保證 exactly once 的語義。目前基 于Kafka Direct API加上Spark RDD 算子精確一次的保證能夠?qū)崿F(xiàn)端對端的 exactly once 的語義。
在數(shù)據(jù)存儲階段一般實現(xiàn) exactly once 需要保證存儲的過程是冪等操作或事務(wù)操作。很多系統(tǒng)本身就支持了冪等操作,比如相同數(shù)據(jù)寫 hdfs 同一個文件,這本身就是冪等操作,保證了多次操作最終獲取的值還是相同;HBase、ElasticSearch 與 redis 等都能夠?qū)崿F(xiàn)冪等操作。對于關(guān)系型數(shù)據(jù)庫的操作一般都是能夠支持事務(wù)性操作。
官方在創(chuàng)建 DirectKafkaInputStream 時只需要輸入消費 Kafka 的 From Offset,然后其自行獲取本次消費的 End Offset,也就是當(dāng)前最新的 Offset。保存的 Offset 是本批次的 End Offset,下次消費從上次的 End Offset 開始消費。
當(dāng)程序宕機(jī)或重啟任務(wù)后,這其中存在一些問題。如果在數(shù)據(jù)處理完成前存儲 Offset,則可能存在作業(yè)處理數(shù)據(jù)失敗與作業(yè)宕機(jī)等情況,重啟后會無法追溯上次處理的數(shù)據(jù)導(dǎo)致數(shù)據(jù)出現(xiàn)丟失。如果在數(shù)據(jù)處理完成后存儲 Offset,但是存儲 Offset 過程中發(fā)生失敗或作業(yè)宕機(jī)等情況,則在重啟后會重復(fù)消費上次已經(jīng)消費過的數(shù)據(jù)。
而且此時又無法保證重啟后消費的數(shù)據(jù)與宕機(jī)前的數(shù)據(jù)量相同數(shù)據(jù)相當(dāng),這又會引入另外一個問題,如果是基于聚合統(tǒng)計指標(biāo)作更新操作,這會帶來無法判斷上次數(shù)據(jù)是否已經(jīng)更新成功。
所以在 muise spark core 中我們加入了自己的實現(xiàn)用以保證 Exactly once 的語義。具體的實現(xiàn)是我們對 Spark 源碼進(jìn)行了改造,保證在創(chuàng)建 DirectKafkaInputStream 可以同時輸入 From Offset 與 End Offset,并且我們在存儲 Kafka Offset 的時候保存了每個批次的起始Offset 與結(jié)束 Offset,具體格式如下:
如此做的用意在于能夠確保無論是宕機(jī)還是人為重啟,重啟后的第一個批次與重啟前的最后一個批次數(shù)據(jù)一模一樣。這樣的設(shè)計使得后面用戶在后面對于第一個批次的數(shù)據(jù)處理非常靈活可變,如果用戶直接忽略第一個批次的數(shù)據(jù),那此時保證的是 at most once 的語義,因為我們無法獲知重啟前的最后一個批次數(shù)據(jù)操作是否有成功完成。
如果用戶依照原有邏輯處理第一個批次的數(shù)據(jù),不對其做去重操作,那此時保證的是 at least once 的語義,最終結(jié)果中可能存在重復(fù)數(shù)據(jù);最后如果用戶想要實現(xiàn) exactly once,muise spark core 提供了根據(jù)topic、partition 與 offset 生成 UID 的功能。
只要確保兩個批次消費的 Offset 相同,則最終生成的 UID 也相同,用戶可以根據(jù)此 UID 作為判斷上個批次數(shù)據(jù)是否有存儲成功的依據(jù)。下面簡單的給出了重啟后第一個批次操作的行為。
002
Metrics系統(tǒng)
Musie spark core 基于 Spark 本身的 metrics 系統(tǒng)進(jìn)行了改造,添加了許多定制的 metrics,并且向用戶暴露了 metrics 注冊接口,用戶可以非常方便的注冊自己的 metrics 并在程序中更新 metrics 的數(shù)值。最后所有的 metrics 會根據(jù)作業(yè)設(shè)定的批次間隔寫入 Graphite,基于公司定制的預(yù)警系統(tǒng)進(jìn)行報警,前端可以通過 Grafana 展現(xiàn)各項 metrics 指標(biāo)。
Muise spark core本身定制的metrics包含以下三種:
Fail 批次時間內(nèi) spark task 失敗次數(shù)超過4次便報警,用于監(jiān)控程序的運行狀態(tài)。
Ack 批次時間內(nèi) spark streaming 處理的數(shù)據(jù)量小0便報警,用于監(jiān)控程序是否在正常消費數(shù)據(jù)。
Lag 批次時間內(nèi)數(shù)據(jù)消費延遲大于設(shè)定值便報警。
其中由于我們大部分作業(yè)開啟了 Back Pressure 功能,這就導(dǎo)致在Spark UI 中看到每個批次數(shù)據(jù)都能在正常時間內(nèi)消費完成,然而可能此時 kafka 中已經(jīng)積壓了大量數(shù)據(jù),故每個批次我們都會計算當(dāng)前消費時間與數(shù)據(jù)本身時間的一個平均差值,如果這個差值大于批次時間,說明本身數(shù)據(jù)消費就已經(jīng)存在了延遲。
下圖展現(xiàn)了預(yù)警系統(tǒng)中,基于用戶自定義注冊的Metrics以及系統(tǒng)定制的Metrics進(jìn)行預(yù)警。
003
容錯
其實在上面 Exactly Once 一章中已經(jīng)詳細(xì)的描述了 muise spark core如何在程序宕機(jī)后能夠保證數(shù)據(jù)正確的處理。但是為了能夠讓 Spark Sreaming 能夠長時間穩(wěn)定的運行在Yarn集群上,還需要添加許多配置,感興趣的朋友可以查看:Long running Spark Streaming Jobs on Yarn Cluster。
除了上述容錯保證之外,Muise Portal(后面會講)也提供了對 Spark Streaming 作業(yè)定時檢測的功能。目前每過5分鐘對當(dāng)前所有數(shù)據(jù)庫中狀態(tài)標(biāo)記為 Running 的 Spark Streaming 作業(yè)進(jìn)行狀態(tài)檢測,通過Yarn提供的REST APIs可以根據(jù)每個作業(yè)的Application Id查詢作業(yè)在Yarn上的狀態(tài),如果狀態(tài)處于非運行狀態(tài),則會嘗試重啟作業(yè)。
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“apache spark中怎么實現(xiàn)端對端的 exactly once”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!
分享名稱:apachespark中怎么實現(xiàn)端對端的exactlyonce
分享地址:http://muchs.cn/article24/jcjdce.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、定制網(wǎng)站、做網(wǎng)站、建站公司、網(wǎng)站設(shè)計公司、
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)