Hadoop實(shí)訓(xùn)任務(wù)3:HDFS和MapReduce綜合操作-創(chuàng)新互聯(lián)

目錄

創(chuàng)新互聯(lián)公司科技有限公司專業(yè)互聯(lián)網(wǎng)基礎(chǔ)服務(wù)商,為您提供綿陽服務(wù)器托管高防主機(jī),成都IDC機(jī)房托管,成都主機(jī)托管等互聯(lián)網(wǎng)服務(wù)。

1、啟動(dòng)Hadoop服務(wù)

2、創(chuàng)建文本文件

3、上傳文本文件

4、顯示文件內(nèi)容

5、完成排序任務(wù)

6、計(jì)算大利潤和平均利潤

7、統(tǒng)計(jì)學(xué)生總成績和平均成績

8、總結(jié)

1、啟動(dòng)Hadoop服務(wù)

master虛擬機(jī)上執(zhí)行命令:

start-all.sh

啟動(dòng)hadoop服務(wù)進(jìn)程

?

?

?

2、創(chuàng)建文本文件

在master虛擬機(jī)上創(chuàng)建本地文件students.txt

李曉文 女 20
張曉航 男 19
鄭小剛 男 21
吳文華 女 18
肖云宇 男 22
陳燕文 女 19
李連杰 男 23
艾曉麗 女 21
童安格 男 18

?

3、上傳文本文件

students.txt上傳到HDFS的/BigDtat目錄

執(zhí)行命令將該文件復(fù)制到HDFS的HelloHadoop文件夾中

hdfs dfs -put /home/student.txt /BigData

?

webUI界面中查看上傳成功?

?

4、顯示文件內(nèi)容

創(chuàng)建maven工程

?

創(chuàng)建maven工程并添加依賴

  org.apache.hadoop 
        hadoop-client3.3.4      junit 
        junit  4.13.2         

?

在?resources目錄里創(chuàng)建?log4j.properties文件

log4j.rootLogger=INFO, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/wordcount.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

?

創(chuàng)建displayFile類用于顯示文件內(nèi)容

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;


public class displayFile {
    @Test
    public void read1() throws Exception {
        // 創(chuàng)建配置對(duì)象
        Configuration conf = new Configuration();
        // 設(shè)置數(shù)據(jù)節(jié)點(diǎn)主機(jī)名屬性
        conf.set("dfs.client.use.datanode.hostname", "true");
        // 定義統(tǒng)一資源標(biāo)識(shí)符(uri: uniform resource identifier)
        String uri = "hdfs://master:9000";
        // 創(chuàng)建文件系統(tǒng)對(duì)象(基于HDFS的文件系統(tǒng))
        FileSystem fs = FileSystem.get(new URI(uri), conf, "root");
        // 創(chuàng)建路徑對(duì)象(指向文件)
        Path path = new Path(uri + "/BigData/student.txt");
        System.out.println(path);
        // 創(chuàng)建文件系統(tǒng)數(shù)據(jù)字節(jié)輸入流(進(jìn)水管:數(shù)據(jù)從文件到程序)
        FSDataInputStream in = fs.open(path);
        // 創(chuàng)建緩沖字符輸入流,提高讀取效率(字節(jié)流-->字符流-->緩沖流)
        BufferedReader br = new BufferedReader(new InputStreamReader(in));
        // 定義行字符串變量
        String nextLine = "";
        // 通過循環(huán)遍歷緩沖字符輸入流
        while ((nextLine = br.readLine()) != null) {
            // 在控制臺(tái)輸出讀取的行
            System.out.println(nextLine);
        }
        // 關(guān)閉緩沖字符輸入流
        br.close();
        // 關(guān)閉文件系統(tǒng)數(shù)據(jù)字節(jié)輸入流
        in.close();
        // 關(guān)閉文件系統(tǒng)
        fs.close();
    }
}

?

5、完成排序任務(wù)

創(chuàng)建Maven項(xiàng)目SortByAge,利用MapReduce計(jì)算框架,處理/BigData/student.txt文件,輸出結(jié)果按照年齡降序排列

李曉文 女 20
張曉航 男 19
鄭小剛 男 21
吳文華 女 18
肖云宇 男 22
陳燕文 女 19
李連杰 男 23
艾曉麗 女 21
童安格 男 18

創(chuàng)建Student類

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Student implements WritableComparable{

    private String name;
    private String gender;
    private int age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }


    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                ", gender='" + gender + '\'' +
                ", age=" + age + '\''+
                '}';
    }

    public int compareTo(Student o) {
        return o.getAge() - this.getAge(); // 降序
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeUTF(gender);
        out.writeInt(age);
    }

    public void readFields(DataInput in) throws IOException {
        name = in.readUTF();
        gender = in.readUTF();
        age = in.readInt();
    }
}

創(chuàng)建WordCountMapper類

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper{
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 獲取行內(nèi)容
        String line = value.toString();
        // 按空格拆分得到字段數(shù)組
        String[] fields = line.split(" ");
        // 獲取學(xué)生信息
        String name = fields[0];
        String gender = fields[1];
        int age = Integer.parseInt(fields[2]);
        // 創(chuàng)建學(xué)生對(duì)象
        Student student = new Student();
        // 設(shè)置學(xué)生對(duì)象屬性
        student.setName(name);
        student.setGender(gender);
        student.setAge(age);
        context.write(student, NullWritable.get());
    }
}

創(chuàng)建WordCountReducer類

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer{
    @Override
    protected void reduce(Student key, Iterablevalues, Context context)
            throws IOException, InterruptedException {
        for (NullWritable value : values) {
            // 獲取學(xué)生對(duì)象
            Student student = key;
            // 拼接學(xué)生信息
            String studentInfo = student.getName() + "\t"
                    + student.getGender() + "\t"
                    + student.getAge();
            context.write(new Text(studentInfo), NullWritable.get());
        }
    }
}

創(chuàng)建WordCountDriver類

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.net.URI;

public class WordCountDriver {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建配置對(duì)象
        Configuration conf = new Configuration();
        // 設(shè)置數(shù)據(jù)節(jié)點(diǎn)主機(jī)名屬性
        conf.set("dfs.client.use.datanode.hostname", "true");

        // 獲取作業(yè)實(shí)例
        Job job = Job.getInstance(conf);
        // 設(shè)置作業(yè)啟動(dòng)類
        job.setJarByClass(WordCountDriver.class);

        // 設(shè)置Mapper類
        job.setMapperClass(WordCountMapper.class);
        // 設(shè)置map任務(wù)輸出鍵類型
        job.setMapOutputKeyClass(Student.class);
        // 設(shè)置map任務(wù)輸出值類型
        job.setMapOutputValueClass(NullWritable.class);

        // 設(shè)置Reducer類
        job.setReducerClass(WordCountReducer.class);
        // 設(shè)置reduce任務(wù)輸出鍵類型
        job.setOutputKeyClass(Student.class);
        // 設(shè)置reduce任務(wù)輸出值類型
        job.setOutputValueClass(NullWritable.class);

        // 定義uri字符串
        String uri = "hdfs://master:9000";
        // 創(chuàng)建輸入目錄
        Path inputPath = new Path(uri + "/BigData");
        // 創(chuàng)建輸出目錄
        Path outputPath = new Path(uri + "/output");

        // 獲取文件系統(tǒng)
        FileSystem fs =  FileSystem.get(new URI(uri), conf);
        // 刪除輸出目錄(第二個(gè)參數(shù)設(shè)置是否遞歸)
        fs.delete(outputPath, true);

        // 給作業(yè)添加輸入目錄(允許多個(gè))
        FileInputFormat.addInputPath(job, inputPath);
        // 給作業(yè)設(shè)置輸出目錄(只能一個(gè))
        FileOutputFormat.setOutputPath(job, outputPath);

        // 等待作業(yè)完成
        job.waitForCompletion(true);

        // 輸出統(tǒng)計(jì)結(jié)果
        System.out.println("======統(tǒng)計(jì)結(jié)果======");
        FileStatus[] fileStatuses = fs.listStatus(outputPath);
        for (int i = 1; i< fileStatuses.length; i++) {
            // 輸出結(jié)果文件路徑
            System.out.println(fileStatuses[i].getPath());
            // 獲取文件系統(tǒng)數(shù)據(jù)字節(jié)輸入流
            FSDataInputStream in = fs.open(fileStatuses[i].getPath());
            // 將結(jié)果文件顯示在控制臺(tái)
            IOUtils.copyBytes(in, System.out, 4096, false);
        }
    }
}

運(yùn)行查看結(jié)果

6、計(jì)算大利潤和平均利潤

利用利用MapReduce計(jì)算框架 處理profit.txt文件,輸出每月大利潤和平均利潤

創(chuàng)建利潤信息profit.txt文件并上傳HDFS

1 10000
1 15000
1 20000
2 2340
2 5640
2 6140
3 15000
3 2380
3 8900

創(chuàng)建ScoreMapper類

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ScoreMapper extends Mapper{
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 獲取行內(nèi)容
        String line = value.toString();
        // 按空格拆分得到字段數(shù)組
        String[] fields = line.split(" ");
        // 獲取月份
        String name = fields[0].trim();
        // 遍歷各利潤信息
        for (int i = 1; i< fields.length; i++) {
            // 獲取利潤信息
            int score = Integer.parseInt(fields[i].trim());
            // 寫入<月份,值>鍵值對(duì)
            context.write(new Text(name), new IntWritable(score));
        }
    }
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ScoreMapper extends Mapper{
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 獲取行內(nèi)容
        String line = value.toString();
        // 按空格拆分得到字段數(shù)組
        String[] fields = line.split(" ");
        // 獲取月份
        String name = fields[0].trim();
        // 遍歷各利潤信息
        for (int i = 1; i< fields.length; i++) {
            // 獲取利潤信息
            int score = Integer.parseInt(fields[i].trim());
            // 寫入<月份,值>鍵值對(duì)
            context.write(new Text(name), new IntWritable(score));
        }
    }
}

創(chuàng)建ScoreReducer類

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.text.DecimalFormat;

public class ScoreReducer extends Reducer{
    @Override
    protected void reduce(Text key, Iterablevalues, Context context)
            throws IOException, InterruptedException {
        // 聲明變量
        int count = 0; // 科目數(shù)
        int sum = 0; // 總分
        int avg = 0; // 平均分
        int max = 20000;
        // 遍歷迭代器計(jì)算總分
        for (IntWritable value : values) {
            count++; // 科目數(shù)累加
            sum += value.get(); // 總分累加
        }
        // 計(jì)算平均值
        avg = sum * 1 / count;
        // 創(chuàng)建小數(shù)格式對(duì)象
        DecimalFormat df = new DecimalFormat("#.#");
        // 拼接每個(gè)大利潤與平均利潤信息
        String scoreInfo = key + " maxProfit=" + max + ", avgProfit=" + df.format(avg);
        // 寫入鍵值對(duì)
        context.write(new Text(scoreInfo), NullWritable.get());
    }
}

創(chuàng)建ScoreDriver類

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.net.URI;

public class ScoreDriver {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建配置對(duì)象
        Configuration conf = new Configuration();
        // 設(shè)置數(shù)據(jù)節(jié)點(diǎn)主機(jī)名屬性
        conf.set("dfs.client.use.datanode.hostname", "true");

        // 獲取作業(yè)實(shí)例
        Job job = Job.getInstance(conf);
        // 設(shè)置作業(yè)啟動(dòng)類
        job.setJarByClass(ScoreDriver.class);

        // 設(shè)置Mapper類
        job.setMapperClass(ScoreMapper.class);
        // 設(shè)置map任務(wù)輸出鍵類型
        job.setMapOutputKeyClass(Text.class);
        // 設(shè)置map任務(wù)輸出值類型
        job.setMapOutputValueClass(IntWritable.class);

        // 設(shè)置Reducer類
        job.setReducerClass(ScoreReducer.class);
        // 設(shè)置reduce任務(wù)輸出鍵類型
        job.setOutputKeyClass(Text.class);
        // 設(shè)置reduce任務(wù)輸出值類型
        job.setOutputValueClass(NullWritable.class);

        // 定義uri字符串
        String uri = "hdfs://master:9000";
        // 創(chuàng)建輸入目錄
        Path inputPath = new Path(uri + "/BigData");
        // 創(chuàng)建輸出目錄
        Path outputPath = new Path(uri + "/maxavgprofit/output");

        // 獲取文件系統(tǒng)
        FileSystem fs =  FileSystem.get(new URI(uri), conf);
        // 刪除輸出目錄(第二個(gè)參數(shù)設(shè)置是否遞歸)
        fs.delete(outputPath, true);

        // 給作業(yè)添加輸入目錄(允許多個(gè))
        FileInputFormat.addInputPath(job, inputPath);
        // 給作業(yè)設(shè)置輸出目錄(只能一個(gè))
        FileOutputFormat.setOutputPath(job, outputPath);

        // 等待作業(yè)完成
        job.waitForCompletion(true);

        // 輸出統(tǒng)計(jì)結(jié)果
        System.out.println("======統(tǒng)計(jì)結(jié)果======");
        FileStatus[] fileStatuses = fs.listStatus(outputPath);

        for (int i = 1; i< fileStatuses.length; i++) {
            // 輸出結(jié)果文件路徑
            System.out.println(fileStatuses[i].getPath());
            // 獲取文件系統(tǒng)數(shù)據(jù)字節(jié)輸入流
            FSDataInputStream in = fs.open(fileStatuses[i].getPath());
            // 將結(jié)果文件顯示在控制臺(tái)
            IOUtils.copyBytes(in, System.out, 4096, false);
        }
    }
}

運(yùn)行查看結(jié)果

7、統(tǒng)計(jì)學(xué)生總成績和平均成績

創(chuàng)建利潤信息score.txt文件并上傳HDFS

姓名 語文 數(shù)學(xué) 英語 物理 化學(xué)
李小雙 89 78 94 96 87
王麗霞 94 80 86 78 
吳雨涵 90 67 95 92 60
張曉紅 87 76 90 79 59
陳燕文 97 95 92 88 86

創(chuàng)建WordCountMapper類

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper{
    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        //獲取行內(nèi)容
        String line = value.toString();
        //按空格拆分得到字段數(shù)組
        String[] fields = line.split(" ");
        //獲取字段信息
        String name = fields[0];

        for (int i = 1; i< fields.length; i++){
            int score = Integer.parseInt(fields[i]);
            context.write(new Text(name),new IntWritable(score));
        }
    }
}

創(chuàng)建WordCountReducer類

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.text.DecimalFormat;

public class WordCountReducer extends Reducer{
    @Override
    protected void reduce(Text key, Iterablevalues, Context context)
            throws IOException, InterruptedException {
        int count = 0;
        int sum = 0;
        double avg = 0;

        for (IntWritable value : values){
            count++;
            sum += value.get();
        }

        avg = sum * 1.0 /count;

        DecimalFormat df = new DecimalFormat("#.#");

        String scoreInfo = "("+key+","+sum+","+df.format(avg)+")";

        context.write(new Text(scoreInfo),NullWritable.get());
    }
}

創(chuàng)建WordCountDriver類

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.net.URI;

public class WordCountDriver {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建配置對(duì)象
        Configuration conf = new Configuration();
        // 設(shè)置數(shù)據(jù)節(jié)點(diǎn)主機(jī)名屬性
        conf.set("dfs.client.use.datanode.hostname", "true");

        // 獲取作業(yè)實(shí)例
        Job job = Job.getInstance(conf);
        // 設(shè)置作業(yè)啟動(dòng)類
        job.setJarByClass(mpr.WordCountDriver.class);

        // 設(shè)置Mapper類
        job.setMapperClass(WordCountMapper.class);
        // 設(shè)置map任務(wù)輸出鍵類型
        job.setMapOutputKeyClass(Text.class);
        // 設(shè)置map任務(wù)輸出值類型
        job.setMapOutputValueClass(IntWritable.class);

        //設(shè)置Reducer類
        job.setReducerClass(WordCountReducer.class);
        // 設(shè)置Reducer任務(wù)輸出鍵類型
        job.setOutputKeyClass(Text.class);
        // 設(shè)置Reducer任務(wù)輸出值類型
        job.setOutputValueClass(NullWritable.class);

        //設(shè)置分區(qū)數(shù)量
        job.setNumReduceTasks(1);

        // 定義uri字符串
        String uri = "hdfs://master:9000";
        // 創(chuàng)建輸入目錄
        Path inputPath = new Path(uri + "/BigData");
        // 創(chuàng)建輸出目錄
        Path outputPath = new Path(uri + "/outputs");

        // 獲取文件系統(tǒng)
        FileSystem fs =  FileSystem.get(new URI(uri), conf);
        // 刪除輸出目錄(第二個(gè)參數(shù)設(shè)置是否遞歸)
        fs.delete(outputPath, true);

        // 給作業(yè)添加輸入目錄(允許多個(gè))
        FileInputFormat.addInputPath(job, inputPath);
        // 給作業(yè)設(shè)置輸出目錄(只能一個(gè))
        FileOutputFormat.setOutputPath(job, outputPath);

        // 等待作業(yè)完成
        job.waitForCompletion(true);

        // 輸出統(tǒng)計(jì)結(jié)果
        System.out.println("======統(tǒng)計(jì)結(jié)果======");
        FileStatus[] fileStatuses = fs.listStatus(outputPath);
        for (int i = 1; i< fileStatuses.length; i++) {
            // 輸出結(jié)果文件路徑
            System.out.println(fileStatuses[i].getPath());
            // 獲取文件系統(tǒng)數(shù)據(jù)字節(jié)輸入流
            FSDataInputStream in = fs.open(fileStatuses[i].getPath());
            // 將結(jié)果文件顯示在控制臺(tái)
            IOUtils.copyBytes(in, System.out, 4096, false);
        }
    }
}

運(yùn)行查看結(jié)果

8、總結(jié)

通過實(shí)訓(xùn),使得更加熟練掌握HDFS操作和MapReduce編程

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購,新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧

網(wǎng)頁題目:Hadoop實(shí)訓(xùn)任務(wù)3:HDFS和MapReduce綜合操作-創(chuàng)新互聯(lián)
文章地址:http://muchs.cn/article6/dpgdog.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作網(wǎng)站設(shè)計(jì)公司、企業(yè)網(wǎng)站制作、品牌網(wǎng)站設(shè)計(jì)、品牌網(wǎng)站建設(shè)虛擬主機(jī)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

營銷型網(wǎng)站建設(shè)