處理鍵值對RDD

保存Key/Value對的RDD叫做Pair RDD。

公司專注于為企業(yè)提供成都做網(wǎng)站、成都網(wǎng)站建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)、微信公眾號開發(fā)、商城開發(fā),小程序開發(fā),軟件定制網(wǎng)站等一站式互聯(lián)網(wǎng)企業(yè)服務(wù)。憑借多年豐富的經(jīng)驗(yàn),我們會(huì)仔細(xì)了解各客戶的需求而做出多方面的分析、設(shè)計(jì)、整合,為客戶設(shè)計(jì)出具風(fēng)格及創(chuàng)意性的商業(yè)解決方案,成都創(chuàng)新互聯(lián)公司更提供一系列網(wǎng)站制作和網(wǎng)站推廣的服務(wù)。

1.創(chuàng)建Pair RDD:

1.1 創(chuàng)建Pair RDD的方式:

很多數(shù)據(jù)格式在導(dǎo)入RDD時(shí),會(huì)直接生成Pair RDD。我們也可以使用map()來將之前講到的普通RDD轉(zhuǎn)化為Pair RDD。

1.2 Pair RDD轉(zhuǎn)化實(shí)例:

下面例子中,把原始RDD,修改成首單詞做Key,整行做Value的Pair RDD。

Java中沒有tuple類型,所以使用scala的scala.Tuple2類來創(chuàng)建tuple。創(chuàng)建tuple:  new Tuple2(elem1,elem2)  ; 訪問tuple的元素:  使用._1()和._2()方法來訪問。

而且,在Python和Scala實(shí)現(xiàn)中使用基本的map()函數(shù)即可,java需要使用函數(shù)mapToPair():

	/**
	 * 將普通的基本RDD轉(zhuǎn)化成一個(gè)Pair RDD,業(yè)務(wù)邏輯: 將每一行的首單詞作為Key,整個(gè)句子作為Value 返回Key/Value PairRDD。
	 * @param JavaRDD<String>
	 * @return JavaPairRDD<String,String>
	 */
	public JavaPairRDD<String,String> firstWordKeyRdd(JavaRDD<String> input){	
		JavaPairRDD<String,String> pair_rdd = input.mapToPair(
				new PairFunction<String,String,String>(){
					@Override
					public Tuple2<String, String> call(String arg0) throws Exception {
						// TODO Auto-generated method stub
						return new Tuple2<String,String>(arg0.split(" ")[0],arg0);
					}
				}
		);
		return pair_rdd;	
	}

當(dāng)從內(nèi)存中的集合創(chuàng)建PairRDD時(shí),Python和Scala需要使用函數(shù)SparkContext.parallelize();而Java使用函數(shù)SparkContext.parallelizePairs()。

2.Pair RDD的轉(zhuǎn)化操作:

2.1 Pair RDD常見的轉(zhuǎn)化操作列表:

基礎(chǔ)RDD使用的轉(zhuǎn)化操作也可以在Pair RDD中使用。因?yàn)镻air RDD中使用tuple,所以需要傳遞操作tuple的函數(shù)給Pair RDD.

下表列出Pair RDD常用的轉(zhuǎn)化操作(事例RDD內(nèi)容:{(1, 2), (3, 4), (3, 6)})

函數(shù)名作用調(diào)用例子返回結(jié)果
reduceByKey(func)Combine values with the same key.rdd.reduceByKey((x, y) => x + y){(1,2),(3,10)}
groupByKey()Group values with the same key.rdd.groupByKey(){(1,[2]),(3,[4,6])}
combineByKey(createCombiner,mergeValue, mergeCombiners,partitioner)Combine values with the same key using a different result type.

mapValues(func)Apply a function to each value of a pair RDD without changing the key.rdd.mapValues(x =>x+1){(1,3),(3,5),(3,7)}
flatMapValues(func)

Apply a function that returns an iterator to each value of a pair RDD, and for each element returned, produce a key/value entry with the old key. Often used for tokenization.

rdd.flatMapValues(x=> (x to 5){(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}
keys()Return an RDD of just the keys.rdd.keys(){1, 3, 3}
values()Return an RDD of just the values.rdd.values(){2, 4, 6}
sortByKey()Return an RDD sorted by the key.rdd.sortByKey(){(1,2),(3,4),(3,6)}

下表列舉2個(gè)RDD之間的轉(zhuǎn)化操作(rdd = {(1, 2), (3, 4), (3, 6)} other = {(3,9)}):

函數(shù)名作用調(diào)用例子返回結(jié)果
subtractByKeyRemove elements with a key present in the other RDD.rdd.subtractByKey(other){(1, 2)}
joinPerform an inner join between two RDDs.rdd.join(other){(3, (4, 9)),(3, (6, 9))}
rightOuterJoinPerform a join between two RDDs where the key must be present in the first RDD.rdd.rightOuterJoin(other){(3,(Some(4),9)), (3,(Some(6),9))}
leftOuterJoinPerform a join between two RDDs where the key must be present in the other RDD.rdd.leftOuterJoin(other){(1,(2,None)),(3,(4,Some(9))),(3,(6,Some(9)))}
cogroupGroup data from both RDDs sharing the same key.rdd.cogroup(other){(1,([2],[])),(3,([4, 6],[9]))}

2.2 Pair RDD篩選操作:

Pair RDD也還是RDD,所以之前介紹的操作(例如filter)也同樣適用于PairRDD。下面程序,篩選長度大于20的行:

	/**
	 * PairRDD篩選長度大于20的行。
	 * @param JavaPairRDD<String,String>
	 * @return JavaPairRDD<String,String>
	 */
	public JavaPairRDD<String,String> filterMoreThanTwentyLines
			(JavaPairRDD<String,String> input){	
		JavaPairRDD<String,String> filter_rdd = input.filter(
				new Function<Tuple2<String, String>,Boolean>(){
					@Override
					public Boolean call(Tuple2<String, String> arg0) throws Exception {
						// TODO Auto-generated method stub
						return (arg0._2.length()>20);
					}					
				}
				);
		return filter_rdd;	
	}

2.3 聚合操作:


./spark-shell --jars /spark/alluxio-1.2.0/core/client/target/alluxio-core-client-1.2.0-jar-with-dependencies.jar

Pair RDD提供下面方法:

1. reduceByKey()方法:可以分別歸約每個(gè)鍵對應(yīng)的數(shù)據(jù);

2. join()方法:可以把兩個(gè)RDD中鍵相同的元素組合在一起,合并為一個(gè)RDD。

一、創(chuàng)建Pair RDD:

當(dāng)需要將一個(gè)普通RDD轉(zhuǎn)化成一個(gè)Pair RDD時(shí),可以使用map()函數(shù)來實(shí)現(xiàn)。

程序4-1:  使用第一個(gè)單詞作為鍵建出一個(gè)Pair RDD:

val text1 = sc.textFile("file:///spark/spark/README.md")

val pair1 = text1.map(x=>( x.split(" ")(0),x))

println(pair1.collect().mkString("  "))

程序4-2:  對Pair RDD的第2個(gè)元素篩選:

val text2 = sc.textFile("file:///spark/spark/README.md")

val pair_base2 = text2.map(x=>( x.split(" ")(0),x))

val pair2 = pair_base2 .filter{case(key,value)=>value.length<20}

println(pair2.collect().mkString("  "))

對于二元組數(shù)據(jù),有時(shí)我們只想訪問Pair RDD的值的部分,這時(shí)操作二元組很麻煩。可以使用mapValues(func)函數(shù),單操作value,不操作key,功能類似于map{case(x,y):(x,func(y))}

reduceByKey()與reduce()類似,reduceByKey()會(huì)為數(shù)據(jù)集中的每個(gè)鍵進(jìn)行并行的歸約操作,每個(gè)歸約操作會(huì)將鍵相同的值合并起來。返回一個(gè)由各鍵和對應(yīng)鍵歸約出來的結(jié)果值組成的新的RDD。

foldByKey()使用一個(gè)與RDD和合并函數(shù)中的數(shù)據(jù)類型相同的零值作為初始值。

用scala從一個(gè)內(nèi)存中的數(shù)據(jù)集創(chuàng)建PairRDD時(shí),只需要對這個(gè)由二元組組成的集合調(diào)用SparkContext.parallelize()方法。

程序4-3:  計(jì)算每個(gè)鍵對應(yīng)的平均值:

val text3 = sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))

val text3_final = text3.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))

println(text3_final.collect().mkString(" "))

調(diào)用reduceByKey()和foldByKey()會(huì)在每個(gè)鍵計(jì)算全局的總結(jié)果之前先自動(dòng)在每臺機(jī)器上進(jìn)行本地合并。

其中,mapValues(x=>(x,1))得到的輸出結(jié)果是:"panda",(0,1) ;"pink",(3,1) ;"pirate",(3,1) ;"panda",(1,1) ;"pink",(4,1) ;

下一步,reduceByKey((x,y)=>(x._1+y._1,x._2+y._2));自動(dòng)合并同一個(gè)key的數(shù)據(jù)。例如對于panda,(0,1) ,(1,1)   => (0+1 , 1+1) {解釋:(0,1)的第一個(gè)數(shù)據(jù)加上(1,1)的第一個(gè)數(shù)據(jù)作為第一個(gè)數(shù)據(jù),(0,1)的第二個(gè)數(shù)據(jù)加上(1,1)的第二個(gè)數(shù)據(jù)作為第二個(gè)數(shù)據(jù)}也就是 (1,2)。

程序4-3的輸出結(jié)果類似于: (pink,(7,2)) (pirate,(3,1)) (panda,(1,2)) ;將每個(gè)key的總和得到、并將每個(gè)key出現(xiàn)的次數(shù)得到。

程序4-4:  實(shí)現(xiàn)單詞計(jì)數(shù):

val text4 = sc.textFile("file:///spark/spark/README.md")

val words = text4.flatMap(x=>x.split(" "))

words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)

combineByKey()是最為常用的基于鍵進(jìn)行聚合的函數(shù)。combineByKey()可以讓用戶返回與輸入數(shù)據(jù)的類型不同的返回值。

如果第一次出現(xiàn)一個(gè)新的元素(鍵)(在每一個(gè)分區(qū)中第一次出現(xiàn),而不是整個(gè)RDD中第一次出現(xiàn)),會(huì)使用createCombiner()函數(shù)來創(chuàng)建那個(gè)鍵對應(yīng)的累加器的初始值。

如果是一個(gè)在處理當(dāng)前分區(qū)之前已經(jīng)遇到的鍵,則使用mergeValue()方法將該鍵的累加器對應(yīng)的當(dāng)前值與這個(gè)新的值進(jìn)行合并。

combineByKey()有多個(gè)參數(shù)分別對應(yīng)聚合操作的各個(gè)階段,因而非常適合用來解釋聚合操作各個(gè)階段的功能劃分。

程序4-5:  使用combineByKey()來實(shí)現(xiàn)計(jì)算每個(gè)key的平均值:

val input = sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))

val result = input.combineByKey( (v) => (v,1),

(acc:(Int,Int),v) => (acc._1+v,acc._2+1),

(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._1+acc2._1,acc1._2+acc2._2)

).map{case(key,value)=>(key,value._1/value._2.toFloat)}

result.collectAsMap().map(println(_))

其中combineByKey()接收3個(gè)函數(shù):

第一個(gè)函數(shù)是createCombiner(),也就是在某一個(gè)分區(qū)第一次碰到一個(gè)新的key,比如panda是第一次出現(xiàn),則調(diào)用createCombiner函數(shù)。

第二個(gè)函數(shù)是mergeValue(),在同一個(gè)分區(qū)中,如果出現(xiàn)了之前出現(xiàn)過的一個(gè)key,例如是panda,這時(shí)候調(diào)用這個(gè)函數(shù),

第三個(gè)函數(shù)是mergeCombiners(),


網(wǎng)站題目:處理鍵值對RDD
文章路徑:http://muchs.cn/article36/jpegpg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站策劃、域名注冊、網(wǎng)站收錄、電子商務(wù)網(wǎng)站設(shè)計(jì)、定制網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站建設(shè)網(wǎng)站維護(hù)公司