Spark中的RDD簡單算子如何理解,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
太和網站制作公司哪家好,找創(chuàng)新互聯(lián)!從網頁設計、網站建設、微信開發(fā)、APP開發(fā)、響應式網站建設等網站項目制作,到程序開發(fā),運營維護。創(chuàng)新互聯(lián)于2013年創(chuàng)立到現(xiàn)在10年的時間,我們擁有了豐富的建站經驗和運維經驗,來保證我們的工作的順利進行。專注于網站建設就選創(chuàng)新互聯(lián)。
collect
返回RDD的所有元素
scala> var input=sc.parallelize(Array(-1,0,1,2,2)) input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:27 scala> var result=input.collect result: Array[Int] = Array(-1, 0, 1, 2, 2)
count,coutByValue
count返回RDD的元素數量,countByValue返回每個值的出現(xiàn)次數
scala> var input=sc.parallelize(Array(-1,0,1,2,2)) scala> var result=input.count result: Long = 5 scala> var result=input.countByValue result: scala.collection.Map[Int,Long] = Map(0 -> 1, 1 -> 1, 2 -> 2, -1 -> 1)
take,top,takeOrdered
take返回RDD的前N個元素 takeOrdered默認返回升序排序的前N個元素,可以指定排序算法 Top返回降序排序的前N個元素
var input=sc.parallelize(Array(1,2,3,4,9,8,7,5,6)) scala> var result=input.take(6) result: Array[Int] = Array(1, 2, 3, 4, 9, 8) scala> var result=input.take(20) result: Array[Int] = Array(1, 2, 3, 4, 9, 8, 7, 5, 6) scala> var result=input.takeOrdered(6) result: Array[Int] = Array(1, 2, 3, 4, 5, 6) scala> var result=input.takeOrdered(6)(Ordering[Int].reverse) result: Array[Int] = Array(9, 8, 7, 6, 5, 4) scala> var result=input.top(6) result: Array[Int] = Array(9, 8, 7, 6, 5, 4 )
Filter
傳入返回值為boolean的函數,返回改函數結果為true的RDD
scala> var input=sc.parallelize(Array(-1,0,1,2)) scala> var result=input.filter(_>0).collect() result: Array[Int] = Array(1, 2)
map,flatmap
map對每個元素執(zhí)行函數,轉換為新的RDD,flatMap和map類似,但會把map的返回結果做flat處理,就是把多個Seq的結果拼接成一個Seq輸出
scala> var input=sc.parallelize(Array(-1,0,1,2)) scala> var result=input.map(_+1).collect result: Array[Int] = Array(0, 1, 2, 3) scala>var result=input.map(x=>x.to(3)).collect result: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(-1, 0, 1, 2, 3), Range(0, 1, 2, 3), Range(1, 2, 3), Range(2, 3)) scala>var result=input.flatMap(x=>x.to(3)).collect result: Array[Int] = Array(-1, 0, 1, 2, 3, 0, 1, 2, 3, 1, 2, 3, 2, 3)
distinct
RDD去重
scala>var input=sc.parallelize(Array(-1,0,1,2,2)) scala>var result=input.distinct.collect result: Array[Int] = Array(0, 1, 2, -1)
Reduce
通過函數聚集RDD中的所有元素
scala> var input=sc.parallelize(Array(-1,0,1,2)) scala> var result=input.reduce((x,y)=>{println(x,y);x+y}) (-1,1) //處理-1,1,結果為0,RDD剩余元素為{0,2} (0,2) //上面的結果為0,在處理0,2,結果為2,RDD剩余元素為{0} (2,0) //上面結果為2,再處理(2,0),結果為2,RDD剩余元素為{} result: Int = 2
sample,takeSample
sample就是從RDD中抽樣,***個參數withReplacement是指是否有放回的抽樣,true為放回,為false為不放回,放回就是抽樣結果可能重復,第二個參數是fraction,0到1之間的小數,表明抽樣的百分比 takeSample類似,但返回類型是Array,***個參數是withReplacement,第二個參數是樣本個數
var rdd=sc.parallelize(1 to 20) scala> rdd.sample(true,0.5).collect res33: Array[Int] = Array(6, 8, 13, 15, 17, 17, 17, 18, 20) scala> rdd.sample(false,0.5).collect res35: Array[Int] = Array(1, 3, 10, 11, 12, 13, 14, 17, 18) scala> rdd.sample(true,1).collect res44: Array[Int] = Array(2, 2, 3, 5, 6, 6, 8, 9, 9, 10, 10, 10, 14, 15, 16, 17, 17, 18, 19, 19, 20, 20) scala> rdd.sample(false,1).collect res46: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) scala> rdd.takeSample(true,3) res1: Array[Int] = Array(1, 15, 19) scala> rdd.takeSample(false,3) res2: Array[Int] = Array(7, 16, 6)
collectAsMap,countByKey,lookup
collectAsMap把PairRDD轉為Map,如果存在相同的key,后面的會覆蓋前面的。 countByKey統(tǒng)計每個key出現(xiàn)的次數 Lookup返回給定key的所有value
scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) scala> var result=input.collectAsMap result: scala.collection.Map[Int,String] = Map(2 -> two, 4 -> four, 1 -> one, 3 -> three) scala> var result=input.countByKey result: scala.collection.Map[Int,Long] = Map(1 -> 2, 2 -> 1, 3 -> 1, 4 -> 1) scala> var result=input.lookup(1) result: Seq[String] = WrappedArray(1, one) scala> var result=input.lookup(2) result: Seq[String] = WrappedArray(two)
groupBy,keyBy
groupBy根據傳入的函數產生的key,形成元素為K-V形式的RDD,然后對key相同的元素分組 keyBy對每個value,為它加上key
scala> var rdd=sc.parallelize(List("A1","A2","B1","B2","C")) scala> var result=rdd.groupBy(_.substring(0,1)).collect result: Array[(String, Iterable[String])] = Array((A,CompactBuffer(A1, A2)), (B,CompactBuffer(B1, B2)), (C,CompactBuffer(C))) scala> var rdd=sc.parallelize(List("hello","world","spark","is","fun")) scala> var result=rdd.keyBy(_.length).collect result: Array[(Int, String)] = Array((5,hello), (5,world), (5,spark), (2,is), (3,fun))
keys,values
scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) scala> var result=input.keys.collect result: Array[Int] = Array(1, 1, 2, 3, 4) scala> var result=input.values.collect result: Array[String] = Array(1, one, two, three, four) mapvalues mapvalues對K-V形式的RDD的每個Value進行操作 scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) scala> var result=input.mapValues(_*2).collect result: Array[(Int, String)] = Array((1,11), (1,oneone), (2,twotwo), (3,threethree), (4,fourfour))
union,intersection,subtract,cartesian
union合并2個集合,不去重 subtract將***個集合中的同時存在于第二個集合的元素去掉 intersection返回2個集合的交集 cartesian返回2個集合的笛卡兒積
scala> var rdd1=sc.parallelize(Array(-1,1,1,2,3)) scala> var rdd2=sc.parallelize(Array(0,1,2,3,4)) scala> var result=rdd1.union(rdd2).collect result: Array[Int] = Array(-1, 1, 1, 2, 3, 0, 1, 2, 3, 4) scala> var result=rdd1.intersection(rdd2).collect result: Array[Int] = Array(1, 2, 3) scala> var result=rdd1.subtract(rdd2).collect result: Array[Int] = Array(-1) scala> var result=rdd1.cartesian(rdd2).collect result: Array[(Int, Int)] = Array((-1,0), (-1,1), (-1,2), (-1,3), (-1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (2,0), (2,1), (2,2), (2,3), (2,4), (3,0), (3,1), (3,2), (3,3), (3,4))
關于Spark中的RDD簡單算子如何理解問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關知識。
當前文章:Spark中的RDD簡單算子如何理解
文章起源:http://muchs.cn/article22/pjjjcc.html
成都網站建設公司_創(chuàng)新互聯(lián),為您提供App設計、品牌網站制作、面包屑導航、標簽優(yōu)化、網站收錄、網站策劃
聲明:本網站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)