??spark執(zhí)行操作時(shí),可以使用驅(qū)動(dòng)器程序Driver中定義的變量,但有時(shí)這種默認(rèn)的使用方式卻并不理想。
創(chuàng)新互聯(lián)一直秉承“誠(chéng)信做人,踏實(shí)做事”的原則,不欺瞞客戶,是我們最起碼的底線! 以服務(wù)為基礎(chǔ),以質(zhì)量求生存,以技術(shù)求發(fā)展,成交一個(gè)客戶多一個(gè)朋友!為您提供成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、成都網(wǎng)頁(yè)設(shè)計(jì)、微信小程序開(kāi)發(fā)、成都網(wǎng)站開(kāi)發(fā)、成都網(wǎng)站制作、成都軟件開(kāi)發(fā)、重慶APP開(kāi)發(fā)是成都本地專業(yè)的網(wǎng)站建設(shè)和網(wǎng)站設(shè)計(jì)公司,等你一起來(lái)見(jiàn)證!
??spark為了解決這兩個(gè)問(wèn)題,提供了兩種類(lèi)型的共享變量:廣播變量(broadcast variable)和累加器(accumulator)。
??查詢每個(gè)國(guó)家的呼號(hào)個(gè)數(shù)
# 將呼號(hào)前綴(國(guó)家代碼)作為廣播變量
signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
// 將呼號(hào)前綴(國(guó)家代碼)作為廣播變量
val signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
val countryContactCounts = contactCounts.map{case (sign, count) => {
val country = lookupInArray(sign, signPrefixes.value)
(country, count)
}}.reduceByKey((x, y) => x+y)
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
// 將呼號(hào)前綴(國(guó)家代碼)作為廣播變量
final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());
JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
String sign = callSignCount._1();
String country = lookupCountry(sign, signPrefixes.value());
return new Tuple2(country, callSignCount._2());
}
}).reduceByKey(new SumInts());
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
??累加空行
file = sc.textFile(inputFile)
# 創(chuàng)建Accumulator[Int]并初始化為0
blankLines = sc.accumulator(0)
def extractCallSigns(line):
global blankLines # 訪問(wèn)全局變量
if (line == ""):
blankLines += 1
return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) //創(chuàng)建Accumulator[Int]并初始化為0
val callSigns = file.flatMap(line => {
if (line == "") {
blankLines += 1 //累加器加1
}
line.split(" ")
})
callSigns.saveAsTextFile("output.txt")
println("Blank lines:" + blankLines.value)
JavaRDD<String> rdd = sc.textFile(args[1]);
final Accumulator<Integer> blankLines = sc.accumulator(0);
JavaRDD<String> callSigns = rdd.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
if ("".equals(line)) {
blankLines.add(1);
}
return Arrays.asList(line.split(" "));
}
});
callSigns.saveAsTextFile("output.text");
System.out.println("Blank lines:" + blankLines.value());
忠于技術(shù),熱愛(ài)分享。歡迎關(guān)注公眾號(hào):java大數(shù)據(jù)編程,了解更多技術(shù)內(nèi)容。
本文名稱:9.sparkcore之共享變量
當(dāng)前路徑:http://muchs.cn/article14/jpjode.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營(yíng)銷(xiāo)、品牌網(wǎng)站設(shè)計(jì)、靜態(tài)網(wǎng)站、網(wǎng)站制作、標(biāo)簽優(yōu)化、微信公眾號(hào)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)