Spark廣播變量分析以及如何動態(tài)更新廣播變量

本篇文章給大家分享的是有關(guān)Spark廣播變量分析以及如何動態(tài)更新廣播變量,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

你所需要的網(wǎng)站建設(shè)服務(wù),我們均能行業(yè)靠前的水平為你提供.標(biāo)準(zhǔn)是產(chǎn)品質(zhì)量的保證,主要從事成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站、企業(yè)網(wǎng)站建設(shè)、手機(jī)網(wǎng)站制作設(shè)計(jì)、網(wǎng)頁設(shè)計(jì)、成都品牌網(wǎng)站建設(shè)、網(wǎng)頁制作、做網(wǎng)站、建網(wǎng)站。創(chuàng)新互聯(lián)擁有實(shí)力堅(jiān)強(qiáng)的技術(shù)研發(fā)團(tuán)隊(duì)及素養(yǎng)的視覺設(shè)計(jì)專才。

今天主要介紹一下基于Spark2.4版本的廣播變量。先前的版本比如Spark2.1之前的廣播變量有兩種實(shí)現(xiàn):HttpBroadcast和TorrentBroadcast,但是鑒于HttpBroadcast有各種弊端,目前已經(jīng)舍棄這種實(shí)現(xiàn),小編主要闡述TorrentBroadcast
廣播變量概述
廣播變量是一個只讀變量,通過它我們可以將一些共享數(shù)據(jù)集或者大變量緩存在Spark集群中的各個機(jī)器上而不用每個task都需要copy一個副本,后續(xù)計(jì)算可以重復(fù)使用,減少了數(shù)據(jù)傳輸時網(wǎng)絡(luò)帶寬的使用,提高效率。相比于Hadoop的分布式緩存,廣播的內(nèi)容可以跨作業(yè)共享。
廣播變量要求廣播的數(shù)據(jù)不可變、不能太大但也不能太小(一般幾十M以上)、可被序列化和反序列化、并且必須在driver端聲明廣播變量,適用于廣播多個stage公用的數(shù)據(jù),存儲級別目前是MEMORY_AND_DISK。

廣播變量存儲目前基于Spark實(shí)現(xiàn)的BlockManager分布式存儲系統(tǒng),Spark中的shuffle數(shù)據(jù)、加載HDFS數(shù)據(jù)時切分過來的block塊都存儲在BlockManager中,不是今天的討論點(diǎn),這里先不做詳述了。

廣播變量的創(chuàng)建方式和獲取

//創(chuàng)建廣播變量
val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))

//獲取廣播變量
broadcastVar.value
 廣播變量實(shí)例化過程

1.首先調(diào)用val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))

2.調(diào)用BroadcastManager的newBroadcast方法
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)

3.通過廣播工廠的newBroadcast方法進(jìn)行創(chuàng)建

broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())

在調(diào)用BroadcastManager的newBroadcast方法時已完成對廣播工廠的初始化(initialize方法),我們只需看BroadcastFactory的實(shí)現(xiàn)TorrentBroadcastFactory中對TorrentBroadcast的實(shí)例化過程:

new TorrentBroadcast[T](value_, id)
4.在構(gòu)建TorrentBroadcast時,將廣播的數(shù)據(jù)寫入BlockManager
1)首先會將廣播變量序列化后的對象劃分為多個block塊,存儲在driver端的BlockManager,這樣運(yùn)行在driver端的task就不用創(chuàng)建廣播變量的副本了(具體可以查看TorrentBroadcast的writeBlocks方法) 
2)每個executor在獲取廣播變量時首先從本地的BlockManager獲取。獲取不到就會從driver或者其他的executor上獲取,獲取之后,會將獲取到的數(shù)據(jù)保存在自己的BlockManager中
3)塊的大小默認(rèn)4M
conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024

廣播變量初始化過程

1.首先調(diào)用broadcastVar.value
2.TorrentBroadcast中l(wèi)azy變量_value進(jìn)行初始化,調(diào)用readBroadcastBlock() 
3.先從緩存中讀取,對結(jié)果進(jìn)行模式匹配,匹配成功的直接返回
4.讀取不到通過readBlocks()進(jìn)行讀取  

從driver端或者其他的executor中讀取,將讀取的對象存儲到本地,并存于緩存中

new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)

Spark兩種廣播變量對比

正如【前言】中所說,HttpBroadcast在Spark后續(xù)的版本中已經(jīng)被廢棄,但考慮到部分公司用的Spark版本較低,面試中仍有可能問到兩種實(shí)現(xiàn)的相關(guān)問題,這里簡單介紹一下:
HttpBroadcast會在driver端的BlockManager里面存儲廣播變量對象,并且將該廣播變量序列化寫入文件中去。  所有獲取廣播數(shù)據(jù)請求都在driver端,所以存在單點(diǎn)故障和網(wǎng)絡(luò)IO性能問題。
TorrentBroadcast會在driver端的BlockManager里面存儲廣播變量對象,并將廣播對象分割成若干序列化block塊(默認(rèn)4M),存儲于BlockManager。小的block存儲位置信息,存儲于Driver端的BlockManagerMaster。數(shù)據(jù)請求并非集中于driver端,避免了單點(diǎn)故障和driver端網(wǎng)絡(luò)磁盤IO過高。

TorrentBroadcast在executor端存儲一個對象的同時會將獲取的block存儲于BlockManager,并向driver端的BlockManager匯報(bào)block的存儲信息。

請求數(shù)據(jù)的時候會先獲取block的所有存儲位置信息,并且是隨機(jī)的在所有存儲了該executor的BlockManager去獲取,避免了數(shù)據(jù)請求服務(wù)集中于一點(diǎn)。

總之就是HttpBroadcast導(dǎo)致獲取廣播變量的請求集中于driver端,容易引起driver端單點(diǎn)故障,網(wǎng)絡(luò)IO過高影響性能等問題,而TorrentBroadcast獲取廣播變量的請求服務(wù)即可以請求到driver端也可以在executor,避免了上述問題,當(dāng)然這只是主要的優(yōu)化點(diǎn)。

動態(tài)更新廣播變量
通過上面的介紹,大家都知道廣播變量是只讀的,那么在Spark流式處理中如何進(jìn)行動態(tài)更新廣播變量?

既然無法更新,那么只能動態(tài)生成,應(yīng)用場景有實(shí)時風(fēng)控中根據(jù)業(yè)務(wù)情況調(diào)整規(guī)則庫、實(shí)時日志ETL服務(wù)中獲取最新的日志格式以及字段變更等。

@volatile private var instance: Broadcast[Array[Int]] = null

//獲取廣播變量單例對象
def getInstance(sc: SparkContext, ctime: Long): Broadcast[Array[Int]] = {
 if (instance == null) {
   synchronized {
     if (instance == null) {
       instance = sc.broadcast(fetchLastestData())
     }
   }
 }
 instance
}

//加載要廣播的數(shù)據(jù),并更新廣播變量
def updateBroadCastVar(sc: SparkContext, blocking: Boolean = false): Unit = {
 if (instance != null) {
   //刪除緩存在executors上的廣播副本,并可選擇是否在刪除完成后進(jìn)行block等待
   //底層可選擇是否將driver端的廣播副本也刪除
   instance.unpersist(blocking)
   
   instance = sc.broadcast(fetchLastestData())
 }
}

def fetchLastestData() = {
 //動態(tài)獲取需要更新的數(shù)據(jù)
 //這里是偽代碼
 Array(1, 2, 3)
}
val dataFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

...
...

stream.foreachRDD { rdd =>
 val current_time = dataFormat.format(new Date())
 val new_time = current_time.substring(14, 16).toLong
 //每10分鐘更新一次
 if (new_time % 10 == 0) {
   updateBroadCastVar(rdd.sparkContext, true)
 }

 rdd.foreachPartition { records =>
   instance.value
   ...
 }
}
注意:上述是給出了一個實(shí)現(xiàn)思路的偽代碼,實(shí)際生產(chǎn)中還需要進(jìn)行一定的優(yōu)化。
此外,這種方式有一定的弊端,就是廣播的數(shù)據(jù)因?yàn)槭侵芷谛愿?,所以存在一定的滯后性。廣播的周期不能太短,要考慮外部存儲要廣播數(shù)據(jù)的存儲系統(tǒng)的壓力。具體的還要看具體的業(yè)務(wù)場景,如果對實(shí)時性要求不是特別高的話,可以采取這種,當(dāng)然也可以參考Flink是如何實(shí)現(xiàn)動態(tài)廣播的。  

Spark流式程序中為何使用單例模式

1.廣播變量是只讀的,使用單例模式可以減少Spark流式程序中每次job生成執(zhí)行,頻繁創(chuàng)建廣播變量帶來的開銷

2.廣播變量單例模式也需要做同步處理。在FIFO調(diào)度模式下,基本不會發(fā)生并發(fā)問題。但是如果你改變了調(diào)度模式,如采用公平調(diào)度模式,同時設(shè)置Spark流式程序并行執(zhí)行的job數(shù)大于1,如設(shè)置參數(shù)spark.streaming.concurrentJobs=4,則必須加上同步代碼

3.在多個輸出流共享廣播變量的情況下,同時配置了公平調(diào)度模式,也會產(chǎn)生并發(fā)問題。建議在foreachRDD或者transform中使用局部變量進(jìn)行廣播,避免在公平調(diào)度模式下不同job之間產(chǎn)生影響。

除了廣播變量,累加器也是一樣。在Spark流式組件如Spark Streaming底層,每個輸出流都會產(chǎn)生一個job,形成一個job集合提交到線程池里并發(fā)執(zhí)行。

以上就是Spark廣播變量分析以及如何動態(tài)更新廣播變量,小編相信有部分知識點(diǎn)可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

網(wǎng)站名稱:Spark廣播變量分析以及如何動態(tài)更新廣播變量
網(wǎng)頁網(wǎng)址:http://muchs.cn/article0/ihgcio.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈動態(tài)網(wǎng)站、搜索引擎優(yōu)化網(wǎng)站制作、網(wǎng)站收錄、

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁設(shè)計(jì)公司