9.sparkcore之共享變量

簡(jiǎn)介

??spark執(zhí)行操作時(shí),可以使用驅(qū)動(dòng)器程序Driver中定義的變量,但有時(shí)這種默認(rèn)的使用方式卻并不理想。

創(chuàng)新互聯(lián)一直秉承“誠(chéng)信做人,踏實(shí)做事”的原則,不欺瞞客戶,是我們最起碼的底線! 以服務(wù)為基礎(chǔ),以質(zhì)量求生存,以技術(shù)求發(fā)展,成交一個(gè)客戶多一個(gè)朋友!為您提供成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、成都網(wǎng)頁(yè)設(shè)計(jì)、微信小程序開(kāi)發(fā)、成都網(wǎng)站開(kāi)發(fā)、成都網(wǎng)站制作、成都軟件開(kāi)發(fā)、重慶APP開(kāi)發(fā)是成都本地專業(yè)的網(wǎng)站建設(shè)和網(wǎng)站設(shè)計(jì)公司,等你一起來(lái)見(jiàn)證!

  • 集群中運(yùn)行的每個(gè)任務(wù)都會(huì)連接驅(qū)動(dòng)器獲取變量。如果獲取的變量比較大,執(zhí)行效率會(huì)非常低下。
  • 每個(gè)任務(wù)都會(huì)得到這些變量的一份新的副本,更新這些副本的值不會(huì)影響驅(qū)動(dòng)器中的對(duì)應(yīng)變量。如果驅(qū)動(dòng)器需要獲取變量的結(jié)果值,這種方式是不可行的。

??spark為了解決這兩個(gè)問(wèn)題,提供了兩種類(lèi)型的共享變量:廣播變量(broadcast variable)和累加器(accumulator)。

  • 廣播變量用于高效分發(fā)較大的對(duì)象。會(huì)在每個(gè)執(zhí)行器本地緩存一份大對(duì)象,而避免每次都連接驅(qū)動(dòng)器獲取。
  • 累加器用于在驅(qū)動(dòng)器中對(duì)數(shù)據(jù)結(jié)果進(jìn)行聚合。

廣播變量

原理

9.spark core之共享變量

  • 廣播變量只能在Driver端定義,不能在Executor端定義。
  • 在Driver端可以修改廣播變量的值,在Executor端無(wú)法修改廣播變量的值。
  • 如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本;如果使用廣播變量在每個(gè)Executor中只有一份Driver端的變量副本。

用法

  • 通過(guò)對(duì)一個(gè)類(lèi)型T的對(duì)象調(diào)用SparkContext.broadcast創(chuàng)建出一個(gè)BroadCast[T]對(duì)象,任何可序列化的類(lèi)型都可以這么實(shí)現(xiàn)。
  • 通過(guò)value屬性訪問(wèn)該對(duì)象的值
  • 變量只會(huì)被發(fā)到各個(gè)節(jié)點(diǎn)一次,應(yīng)作為只讀值處理。(修改這個(gè)值不會(huì)影響到別的節(jié)點(diǎn))

    實(shí)例

    ??查詢每個(gè)國(guó)家的呼號(hào)個(gè)數(shù)

    python
# 將呼號(hào)前綴(國(guó)家代碼)作為廣播變量
signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
scala
// 將呼號(hào)前綴(國(guó)家代碼)作為廣播變量
val signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

val countryContactCounts = contactCounts.map{case (sign, count) => {
    val country = lookupInArray(sign, signPrefixes.value)
    (country, count)
    }}.reduceByKey((x, y) => x+y)

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
java
// 將呼號(hào)前綴(國(guó)家代碼)作為廣播變量
final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());

JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
    public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
        String sign = callSignCount._1();
        String country = lookupCountry(sign, signPrefixes.value());
        return new Tuple2(country, callSignCount._2()); 
    }
}).reduceByKey(new SumInts());

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");

累加器

原理

9.spark core之共享變量

  • 累加器在Driver端定義賦初始值。
  • 累加器只能在Driver端讀取最后的值,在Excutor端更新。

用法

  • 通過(guò)調(diào)用sc.accumulator(initivalValue)方法,創(chuàng)建出存有初始值的累加器。返回值為org.apache.spark.Accumulator[T]對(duì)象,其中T是初始值initialValue的類(lèi)型。
  • Spark閉包里的執(zhí)行器代碼可以使用累加器的+=方法增加累加器的值
  • 驅(qū)動(dòng)器程序可以調(diào)用累加器的value屬性來(lái)訪問(wèn)累加器的值

實(shí)例

??累加空行

python
file = sc.textFile(inputFile)
# 創(chuàng)建Accumulator[Int]并初始化為0
blankLines = sc.accumulator(0)

def extractCallSigns(line):
    global blankLines # 訪問(wèn)全局變量
    if (line == ""):
        blankLines += 1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value
scala
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) //創(chuàng)建Accumulator[Int]并初始化為0

val callSigns = file.flatMap(line => {
    if (line == "") {
        blankLines += 1 //累加器加1
    }
    line.split(" ")
})

callSigns.saveAsTextFile("output.txt")
println("Blank lines:" + blankLines.value)
java
JavaRDD<String> rdd = sc.textFile(args[1]);

final Accumulator<Integer> blankLines = sc.accumulator(0);

JavaRDD<String> callSigns = rdd.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String line) {
        if ("".equals(line)) {
            blankLines.add(1);
        }
        return Arrays.asList(line.split(" "));
    }
});

callSigns.saveAsTextFile("output.text");
System.out.println("Blank lines:" + blankLines.value());

忠于技術(shù),熱愛(ài)分享。歡迎關(guān)注公眾號(hào):java大數(shù)據(jù)編程,了解更多技術(shù)內(nèi)容。

9.spark core之共享變量

本文名稱:9.sparkcore之共享變量
當(dāng)前路徑:http://muchs.cn/article14/jpjode.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營(yíng)銷(xiāo)、品牌網(wǎng)站設(shè)計(jì)、靜態(tài)網(wǎng)站網(wǎng)站制作、標(biāo)簽優(yōu)化微信公眾號(hào)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作