本篇文章給大家分享的是有關(guān)Spark廣播變量分析以及如何動態(tài)更新廣播變量,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
你所需要的網(wǎng)站建設(shè)服務(wù),我們均能行業(yè)靠前的水平為你提供.標(biāo)準(zhǔn)是產(chǎn)品質(zhì)量的保證,主要從事成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站、企業(yè)網(wǎng)站建設(shè)、手機(jī)網(wǎng)站制作設(shè)計(jì)、網(wǎng)頁設(shè)計(jì)、成都品牌網(wǎng)站建設(shè)、網(wǎng)頁制作、做網(wǎng)站、建網(wǎng)站。創(chuàng)新互聯(lián)擁有實(shí)力堅(jiān)強(qiáng)的技術(shù)研發(fā)團(tuán)隊(duì)及素養(yǎng)的視覺設(shè)計(jì)專才。
廣播變量存儲目前基于Spark實(shí)現(xiàn)的BlockManager分布式存儲系統(tǒng),Spark中的shuffle數(shù)據(jù)、加載HDFS數(shù)據(jù)時切分過來的block塊都存儲在BlockManager中,不是今天的討論點(diǎn),這里先不做詳述了。
廣播變量的創(chuàng)建方式和獲取
//創(chuàng)建廣播變量
val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))
//獲取廣播變量
broadcastVar.value
1.首先調(diào)用val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
3.通過廣播工廠的newBroadcast方法進(jìn)行創(chuàng)建
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
在調(diào)用BroadcastManager的newBroadcast方法時已完成對廣播工廠的初始化(initialize方法),我們只需看BroadcastFactory的實(shí)現(xiàn)TorrentBroadcastFactory中對TorrentBroadcast的實(shí)例化過程:
new TorrentBroadcast[T](value_, id)
conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
廣播變量初始化過程
從driver端或者其他的executor中讀取,將讀取的對象存儲到本地,并存于緩存中
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
Spark兩種廣播變量對比
TorrentBroadcast在executor端存儲一個對象的同時會將獲取的block存儲于BlockManager,并向driver端的BlockManager匯報(bào)block的存儲信息。
總之就是HttpBroadcast導(dǎo)致獲取廣播變量的請求集中于driver端,容易引起driver端單點(diǎn)故障,網(wǎng)絡(luò)IO過高影響性能等問題,而TorrentBroadcast獲取廣播變量的請求服務(wù)即可以請求到driver端也可以在executor,避免了上述問題,當(dāng)然這只是主要的優(yōu)化點(diǎn)。
既然無法更新,那么只能動態(tài)生成,應(yīng)用場景有實(shí)時風(fēng)控中根據(jù)業(yè)務(wù)情況調(diào)整規(guī)則庫、實(shí)時日志ETL服務(wù)中獲取最新的日志格式以及字段變更等。
@volatile private var instance: Broadcast[Array[Int]] = null
//獲取廣播變量單例對象
def getInstance(sc: SparkContext, ctime: Long): Broadcast[Array[Int]] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(fetchLastestData())
}
}
}
instance
}
//加載要廣播的數(shù)據(jù),并更新廣播變量
def updateBroadCastVar(sc: SparkContext, blocking: Boolean = false): Unit = {
if (instance != null) {
//刪除緩存在executors上的廣播副本,并可選擇是否在刪除完成后進(jìn)行block等待
//底層可選擇是否將driver端的廣播副本也刪除
instance.unpersist(blocking)
instance = sc.broadcast(fetchLastestData())
}
}
def fetchLastestData() = {
//動態(tài)獲取需要更新的數(shù)據(jù)
//這里是偽代碼
Array(1, 2, 3)
}
val dataFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
...
...
stream.foreachRDD { rdd =>
val current_time = dataFormat.format(new Date())
val new_time = current_time.substring(14, 16).toLong
//每10分鐘更新一次
if (new_time % 10 == 0) {
updateBroadCastVar(rdd.sparkContext, true)
}
rdd.foreachPartition { records =>
instance.value
...
}
}
Spark流式程序中為何使用單例模式
1.廣播變量是只讀的,使用單例模式可以減少Spark流式程序中每次job生成執(zhí)行,頻繁創(chuàng)建廣播變量帶來的開銷
2.廣播變量單例模式也需要做同步處理。在FIFO調(diào)度模式下,基本不會發(fā)生并發(fā)問題。但是如果你改變了調(diào)度模式,如采用公平調(diào)度模式,同時設(shè)置Spark流式程序并行執(zhí)行的job數(shù)大于1,如設(shè)置參數(shù)spark.streaming.concurrentJobs=4,則必須加上同步代碼
3.在多個輸出流共享廣播變量的情況下,同時配置了公平調(diào)度模式,也會產(chǎn)生并發(fā)問題。建議在foreachRDD或者transform中使用局部變量進(jìn)行廣播,避免在公平調(diào)度模式下不同job之間產(chǎn)生影響。
除了廣播變量,累加器也是一樣。在Spark流式組件如Spark Streaming底層,每個輸出流都會產(chǎn)生一個job,形成一個job集合提交到線程池里并發(fā)執(zhí)行。
以上就是Spark廣播變量分析以及如何動態(tài)更新廣播變量,小編相信有部分知識點(diǎn)可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
網(wǎng)站名稱:Spark廣播變量分析以及如何動態(tài)更新廣播變量
網(wǎng)頁網(wǎng)址:http://muchs.cn/article0/ihgcio.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈、動態(tài)網(wǎng)站、搜索引擎優(yōu)化、網(wǎng)站制作、網(wǎng)站收錄、
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)