Spark2.x中共享變量的累加器是怎樣的

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)Spark2.x中共享變量的累加器是怎樣的,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

成都創(chuàng)新互聯(lián)2013年開創(chuàng)至今,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)、外貿(mào)營銷網(wǎng)站建設(shè)網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢想脫穎而出為使命,1280元瓊海做網(wǎng)站,已為上家服務(wù),為瓊海各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:028-86922220

為什么要定義累加器?

    在 Spark 應(yīng)用程序中,我們經(jīng)常會有這樣的需求,如要需要統(tǒng)計(jì)符合某種特性數(shù)據(jù)的總數(shù),這種需求都需要用到計(jì)數(shù)器。如果一個(gè)變量不被聲明為一個(gè)累加器,那么它將在被改變時(shí)不會在 driver 端進(jìn)行全局匯總,即在分布式運(yùn)行時(shí)每個(gè) task 運(yùn)行的只是原始變量的 一個(gè)副本,并不能改變原始變量的值,但是當(dāng)這個(gè)變量被聲明為累加器后,該變量就會有分布式計(jì)數(shù)的功能。

  定義了一個(gè)累加器sum,而不是普通變量,實(shí)例實(shí)例代碼如下:

package com.hadoop.ljs.spark220.studyimport org.apache.spark.{SparkConf, SparkContext}/**  * @author: Created By lujisen  * @company ChinaUnicom Software JiNan  * @date: 2020-02-20 19:36  * @version: v1.0  * @description: com.hadoop.ljs.spark220.study  */object AccumlatorTest {  def main(args: Array[String]): Unit = {    val sparkConf=new SparkConf().setMaster("local[*]").setAppName("AccumlatorTest")    val sc=new SparkContext(sparkConf)    /*定義一個(gè)共享變量:累加器*/    val sum=sc.accumulator(0)    /*輸入數(shù)據(jù)*/    val rdd1=sc.parallelize(List(1,2,3,4,5))    /*求和 ,然后各個(gè)元素加1*/    val rdd2=rdd1.map(x=>{      sum+=x      x    })    /*這里是個(gè)action操作 沒有這個(gè)操作,程序不會執(zhí)行*/    rdd2.collect()    println("求和:"+sum)    sc.stop()  }}

運(yùn)行結(jié)果如下,sum=15,符合我們的期望值:

Spark2.x中共享變量的累加器是怎樣的

結(jié)合上面的代碼說一下累加器的執(zhí)行過程:

 1).Accumulator需要在Driver進(jìn)行定義和并初始化,并進(jìn)行注冊,同時(shí)Accumulator首先需要在Driver進(jìn)行序列化,然后發(fā)送到Executor端;另外,Driver接收到Task任務(wù)完成的狀態(tài)更新后,會去更新Value的值,然后在Action操作執(zhí)行后就可以獲取到Accumulator的值了。

  2).Executor接收到Task之后會進(jìn)行反序列化操作,反序列化得到RDD和function,同時(shí)在反序列化的同時(shí)也去反序列化Accumulator,同時(shí)也會向TaskContext完成注冊,完成任務(wù)計(jì)算之后,隨著Task結(jié)果一起返回給Driver端進(jìn)行處理。

    這里有執(zhí)行過程圖可以參考下:

Spark2.x中共享變量的累加器是怎樣的

累加器特性:

    1.累加器也是也具有懶加載屬性,只有在action操作執(zhí)行時(shí),才會強(qiáng)制觸發(fā)計(jì)算求值;

    2.累加器的值只可以在Driver端定義初始化,在Executor端更新,不能在Executor端進(jìn)行定義初始化,不能在Executor端通過[.value]獲取值,任何工作節(jié)點(diǎn)上的Task都不能訪問累加器的值;

    3.閉包里的執(zhí)行器代碼可以使用累加器的 += 方法(在Java中是 add )增加累加器的值。


特別提醒:

    累加器在Driver端定義賦初始值,累加器只能在Driver端讀取最后的值,在Excutor端更新。

上述就是小編為大家分享的Spark2.x中共享變量的累加器是怎樣的了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

網(wǎng)站題目:Spark2.x中共享變量的累加器是怎樣的
文章轉(zhuǎn)載:http://muchs.cn/article42/ppjgec.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)頁設(shè)計(jì)公司營銷型網(wǎng)站建設(shè)、微信小程序域名注冊、用戶體驗(yàn)、定制網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站建設(shè)網(wǎng)站維護(hù)公司