本篇文章給大家分享的是有關(guān)MapReduce中怎樣實(shí)現(xiàn)二次排序,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來(lái)看看吧。
成都創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括新建網(wǎng)站建設(shè)、新建網(wǎng)站制作、新建網(wǎng)頁(yè)制作以及新建網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,新建網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到新建省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
MR的二次排序的需求說明: 在mapreduce操作時(shí),shuffle階段會(huì)多次根據(jù)key值排序。但是在shuffle分組后,相同key值的values序列的順序是不確定的(如下圖)。如果想要此時(shí)value值也是排序好的,這種需求就是二次排序。
原始數(shù)據(jù) 無(wú)二次排序 有二次排序 a 12 a 12 a 12 b 34 b 34 b 13 c 90 b 23 b 23 b 23 b 13 b 34 b 13 c 90 c 90
根據(jù)案例分析,我們要將下面數(shù)據(jù)key按照abc,value按照大小排序,這也就是一個(gè)典型的MR的二次排序的案例,準(zhǔn)備原始數(shù)據(jù):
a 20 b 20 a 5 c 10 c 8 b 15 a 10 b 18 c 29 b 52
我們想要得到的結(jié)果:
a 5 a 10 a 20 b 15 b 18 b 20 b 52 c 8 c 10 c 29
先看方案一的實(shí)現(xiàn)思路:
input -> map -><a,20> -> shuffle -> <a,list(10, 5, 20)> -> reduce -> <a,5> <b,20> <b,list(52, 18, 15, 20)> <a,10> <a,5> <c,list(29, 8, 10)> <a,20> <c,10> <b,15> ... <b,18> <b,20> ...
直接在reduce端對(duì)分組后的values進(jìn)行排序示例代碼:
package com.kfk.hadoop.mr.sort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * @author : 蔡政潔 * @email :caizhengjie888@icloud.com * @date : 2020/10/9 * @time : 7:07 下午 */ public class SortMR extends Configured implements Tool { /** * map * TODO */ public static class TemplateMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ // 創(chuàng)建map輸出的對(duì)象 private static final Text mapOutKey = new Text(); private static final IntWritable mapOutValue = new IntWritable(); @Override public void setup(Context context) throws IOException, InterruptedException { // TODO } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 將每一行數(shù)據(jù)按空格拆開 String[] values = value.toString().split(" "); // 數(shù)據(jù)預(yù)處理,將數(shù)組超過2的過濾掉 if (values.length != 2){ return; } mapOutKey.set(values[0]); mapOutValue.set(Integer.valueOf(values[1])); context.write(mapOutKey,mapOutValue); } @Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO } } /** * reduce * TODO */ public static class TemplateReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ // 創(chuàng)建reduceout端的對(duì)象 private static final IntWritable outputValue = new IntWritable(); @Override public void setup(Context context) throws IOException, InterruptedException { // TODO } @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { List<Integer> valueList = new ArrayList<Integer>(); // 取出value for (IntWritable value:values){ valueList.add(value.get()); } // 打印出reduce輸入的key和valueList System.out.println("Reduce in == KeyIn: "+key+" ValueIn: "+valueList); // 進(jìn)行排序 Collections.sort(valueList); /* valueList:表示上面已經(jīng)排序好的列表,即需要遍歷列表中的值作為reduce的輸出 key不需要改變,即可作為reduce的輸出 */ for (Integer value : valueList){ outputValue.set(value); context.write(key,outputValue); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO } } /** * run * @param args * @return * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1) get conf Configuration configuration = this.getConf(); // 2) create job Job job = Job.getInstance(configuration,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); // 3.1) input,指定job的輸入 Path path = new Path(args[0]); FileInputFormat.addInputPath(job,path); // 3.2) map,指定job的mapper和輸出的類型 job.setMapperClass(TemplateMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 1.分區(qū) // job.setPartitionerClass(); // 2.排序 // job.setSortComparatorClass(); // 3.combiner -可選項(xiàng) // job.setCombinerClass(WordCountCombiner.class); // 4.compress -可配置 // configuration.set("mapreduce.map.output.compress","true"); // 使用的SnappyCodec壓縮算法 // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); // 5.分組 // job.setGroupingComparatorClass(); // 6.設(shè)置reduce的數(shù)量 // job.setNumReduceTasks(2); // 3.3) reduce,指定job的reducer和輸出類型 job.setReducerClass(TemplateReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 3.4) output,指定job的輸出 Path outpath = new Path(args[1]); FileOutputFormat.setOutputPath(job,outpath); // 4) commit,執(zhí)行job boolean isSuccess = job.waitForCompletion(true); // 如果正常執(zhí)行返回0,否則返回1 return (isSuccess) ? 0 : 1; } public static void main(String[] args) { // 添加輸入,輸入?yún)?shù) args = new String[]{ "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort", "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output" }; // WordCountUpMR wordCountUpMR = new WordCountUpMR(); Configuration configuration = new Configuration(); try { // 判斷輸出的文件存不存在,如果存在就將它刪除 Path fileOutPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(fileOutPath)){ fileSystem.delete(fileOutPath,true); } // 調(diào)用run方法 int status = ToolRunner.run(configuration,new SortMR(),args); // 退出程序 System.exit(status); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
運(yùn)行結(jié)果:
a 5 a 10 a 20 b 15 b 18 b 20 b 52 c 8 c 10 c 29
很容易發(fā)現(xiàn),這樣把排序工作都放到reduce端完成,當(dāng)values序列長(zhǎng)度非常大的時(shí)候,會(huì)對(duì)CPU和內(nèi)存造成極大的負(fù)載。
注意的地方(容易被“坑”)在reduce端對(duì)values進(jìn)行迭代的時(shí)候,不要直接存儲(chǔ)value值或者key值,因?yàn)閞educe方法會(huì)反復(fù)執(zhí)行多次,但key和value相關(guān)的對(duì)象只有兩個(gè),reduce會(huì)反復(fù)重用這兩個(gè)對(duì)象。需要用相應(yīng)的數(shù)據(jù)類型.get()取出后再存儲(chǔ)。
方案二的解決思路:
原始數(shù)據(jù) 自定義newkey 在shuffle中排序 reduce輸入 reduce輸出 a 12 a#12,12 a#12,12 b 34 b#34,34 b#13,13 c 90 -> map -> c#90,90 b#23,23 b#,List(13,23,34)-> reduce -> b,13 b,23 b,34 b 23 b#23,23 b#34,34 b 13 b#13,13 c#90,90
我們可以把key和value聯(lián)合起來(lái)作為新的key,記作newkey。這時(shí),newkey含有兩個(gè)字段,假設(shè)分別是k,v。這里的k和v是原來(lái)的key和value。原來(lái)的value還是不變。這樣,value就同時(shí)在newkey和value的位置。我們?cè)賹?shí)現(xiàn)newkey的比較規(guī)則,先按照key排序,在key相同的基礎(chǔ)上再按照value排序。在分組時(shí),再按照原來(lái)的key進(jìn)行分組,就不會(huì)影響原有的分組邏輯了。最后在輸出的時(shí)候,只把原有的key、value輸出,就可以變通的實(shí)現(xiàn)了二次排序的需求。
需要自定義的地方 1.自定義數(shù)據(jù)類型實(shí)現(xiàn)組合key 實(shí)現(xiàn)方式:繼承WritableComparable 2.自定義partioner,形成newKey后保持分區(qū)規(guī)則任然按照key進(jìn)行。保證不打亂原來(lái)的分區(qū)。 實(shí)現(xiàn)方式:繼承Partitioner 3.自定義分組,保持分組規(guī)則任然按照key進(jìn)行。不打亂原來(lái)的分組 實(shí)現(xiàn)方式:繼承RawComparator
自定義數(shù)據(jù)類型代碼:
package com.kfk.hadoop.mr.secondsort; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author : 蔡政潔 * @email :caizhengjie888@icloud.com * @date : 2020/10/15 * @time : 6:16 下午 */ public class PairWritable implements WritableComparable<PairWritable> { // 組合key:a#12,12 private String first; private int second; public PairWritable() { } public PairWritable(String first, int second) { this.set(first,second); } /** * 方便設(shè)置字段 */ public void set(String first, int second){ this.first = first; this.second = second; } public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } /** * 重寫比較器 */ public int compareTo(PairWritable o) { int comp = this.getFirst().compareTo(o.getFirst()); if (0 == comp){ // 若第一個(gè)字段相等,則比較第二個(gè)字段 return Integer.valueOf(this.getSecond()).compareTo(o.getSecond()); } return comp; } /** * 序列化 */ public void write(DataOutput out) throws IOException { out.writeUTF(first); out.writeInt(second); } /** * 反序列化 */ public void readFields(DataInput in) throws IOException { this.first = in.readUTF(); this.second = in.readInt(); } @Override public String toString() { return "PairWritable{" + "first='" + first + '\'' + ", second=" + second + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; PairWritable that = (PairWritable) o; if (second != that.second) return false; return first != null ? first.equals(that.first) : that.first == null; } @Override public int hashCode() { int result = first != null ? first.hashCode() : 0; result = 31 * result + second; return result; } }
自定義分區(qū)代碼:
package com.kfk.hadoop.mr.secondsort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * @author : 蔡政潔 * @email :caizhengjie888@icloud.com * @date : 2020/10/15 * @time : 7:09 下午 */ public class FristPartitioner extends Partitioner<PairWritable, IntWritable> { public int getPartition(PairWritable key, IntWritable intWritable, int numPartitions) { /* * 默認(rèn)的實(shí)現(xiàn) (key.hashCode() & Integer.MAX_VALUE) % numPartitions * 讓key中first字段作為分區(qū)依據(jù) */ return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } }
自定義分組比較器代碼:
package com.kfk.hadoop.mr.secondsort; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableComparator; /** * @author : 蔡政潔 * @email :caizhengjie888@icloud.com * @date : 2020/10/15 * @time : 6:59 下午 */ public class FristGrouping implements RawComparator<PairWritable> { /* * 字節(jié)比較 * bytes1,bytes2為要比較的兩個(gè)字節(jié)數(shù)組 * i,i1表示第一個(gè)字節(jié)數(shù)組要進(jìn)行比較的收尾位置,i2,i3表示第二個(gè) * 從第一個(gè)字節(jié)比到組合key中second的前一個(gè)字節(jié),因?yàn)閟econd為int型,所以長(zhǎng)度為4 */ public int compare(byte[] bytes1, int i, int i1, byte[] bytes2, int i2, int i3) { return WritableComparator.compareBytes(bytes1,0,i1-4,bytes2,0,i3-4); } /* * 對(duì)象比較 */ public int compare(PairWritable o1, PairWritable o2) { return o1.getFirst().compareTo(o2.getFirst()); } }
二次排序?qū)崿F(xiàn)代碼:
package com.kfk.hadoop.mr.secondsort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @author : 蔡政潔 * @email :caizhengjie888@icloud.com * @date : 2020/10/9 * @time : 7:07 下午 */ public class SecondSortMR extends Configured implements Tool { /** * map * TODO */ public static class TemplateMapper extends Mapper<LongWritable, Text,PairWritable, IntWritable>{ // 創(chuàng)建map輸出的對(duì)象 private static final PairWritable mapOutKey = new PairWritable(); private static final IntWritable mapOutValue = new IntWritable(); @Override public void setup(Context context) { // TODO } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 將每一行數(shù)據(jù)按空格拆開 String[] values = value.toString().split(" "); // 數(shù)據(jù)預(yù)處理,將數(shù)組超過2的過濾掉 if (values.length != 2){ return; } mapOutKey.set(values[0],Integer.parseInt(values[1])); mapOutValue.set(Integer.parseInt(values[1])); context.write(mapOutKey,mapOutValue); System.out.println("Map out == KeyOut: "+mapOutKey+" ValueOut: "+mapOutValue); } @Override public void cleanup(Context context) { // TODO } } /** * reduce * TODO */ public static class TemplateReducer extends Reducer<PairWritable,IntWritable,Text,IntWritable>{ // 創(chuàng)建reduce output端的對(duì)象 private static final IntWritable outputValue = new IntWritable(); private static final Text outputKey = new Text(); @Override public void setup(Context context) throws IOException, InterruptedException { // TODO } @Override public void reduce(PairWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { /* values表示reduce端輸入已經(jīng)排序好的列表,即需要遍歷values每一個(gè)值作為reduce輸出即可 key表示為自定義的key(newkey),即需要取出newkey的第一部分,也就是原始的key,作為reduce的輸出 */ for (IntWritable value:values){ outputKey.set(key.getFirst()); context.write(outputKey,value); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO } } /** * run * @param args * @return * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1) get conf Configuration configuration = this.getConf(); // 2) create job Job job = Job.getInstance(configuration,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); // 3.1) input,指定job的輸入 Path path = new Path(args[0]); FileInputFormat.addInputPath(job,path); // 3.2) map,指定job的mapper和輸出的類型 job.setMapperClass(TemplateMapper.class); job.setMapOutputKeyClass(PairWritable.class); job.setMapOutputValueClass(IntWritable.class); // 1.分區(qū) job.setPartitionerClass(FristPartitioner.class); // 2.排序 // job.setSortComparatorClass(); // 3.combiner -可選項(xiàng) // job.setCombinerClass(WordCountCombiner.class); // 4.compress -可配置 // configuration.set("mapreduce.map.output.compress","true"); // 使用的SnappyCodec壓縮算法 // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); // 5.分組 job.setGroupingComparatorClass(FristGrouping.class); // 6.設(shè)置reduce的數(shù)量 // job.setNumReduceTasks(2); // 3.3) reduce,指定job的reducer和輸出類型 job.setReducerClass(TemplateReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 3.4) output,指定job的輸出 Path outpath = new Path(args[1]); FileOutputFormat.setOutputPath(job,outpath); // 4) commit,執(zhí)行job boolean isSuccess = job.waitForCompletion(true); // 如果正常執(zhí)行返回0,否則返回1 return (isSuccess) ? 0 : 1; } public static void main(String[] args) { // 添加輸入,輸入?yún)?shù) args = new String[]{ "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort", "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output" }; // WordCountUpMR wordCountUpMR = new WordCountUpMR(); Configuration configuration = new Configuration(); try { // 判斷輸出的文件存不存在,如果存在就將它刪除 Path fileOutPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(fileOutPath)){ fileSystem.delete(fileOutPath,true); } // 調(diào)用run方法 int status = ToolRunner.run(configuration,new SecondSortMR(),args); // 退出程序 System.exit(status); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
運(yùn)行結(jié)果:
a 5 a 10 a 20 b 15 b 18 b 20 b 52 c 8 c 10 c 29
以上就是MapReduce中怎樣實(shí)現(xiàn)二次排序,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
名稱欄目:MapReduce中怎樣實(shí)現(xiàn)二次排序
網(wǎng)站網(wǎng)址:http://muchs.cn/article30/ghgppo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營(yíng)銷型網(wǎng)站建設(shè)、企業(yè)網(wǎng)站制作、建站公司、App開發(fā)、服務(wù)器托管、響應(yīng)式網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)