Hadoop的整文件讀取方法

這篇文章主要講解了“Hadoop的整文件讀取方法”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Hadoop的整文件讀取方法”吧!

成都一家集口碑和實力的網(wǎng)站建設服務商,擁有專業(yè)的企業(yè)建站團隊和靠譜的建站技術,十載企業(yè)及個人網(wǎng)站建設經(jīng)驗 ,為成都1000多家客戶提供網(wǎng)頁設計制作,網(wǎng)站開發(fā),企業(yè)網(wǎng)站制作建設等服務,包括成都營銷型網(wǎng)站建設,成都品牌網(wǎng)站建設,同時也為不同行業(yè)的客戶提供網(wǎng)站建設、成都網(wǎng)站設計的服務,包括成都電商型網(wǎng)站制作建設,裝修行業(yè)網(wǎng)站制作建設,傳統(tǒng)機械行業(yè)網(wǎng)站建設,傳統(tǒng)農(nóng)業(yè)行業(yè)網(wǎng)站制作建設。在成都做網(wǎng)站,選網(wǎng)站制作建設服務商就選創(chuàng)新互聯(lián)建站。

    寫Hadoop程序時,有時候需要讀取整個文件,而不是分片讀取,但默認的為分片讀取,所以,只有編寫自己的整文件讀取類。

需要編寫的有:

    WholeInputFormat類,繼承自FileInputFormat類

    WholeRecordReader類,繼承自RecordReader類

    其中,用于讀取的類是WholeRecordReader類。以下代碼以Text為key值類型,BytesWritable為value的類型,因為大多數(shù)格式在hadoop中都沒有相應的類型支持,比如jpg,sdf,png等等,在hadoop中都沒有相應的類,但是都可以轉換為byte[]字節(jié)流,然后在轉化為BytesWritable類型,最后在Map或者Reduce再轉換成java中的相應類型。

    代碼如下,解釋見 :

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeInputFormat extends FileInputFormat<Text, BytesWritable>
{
    @Override
    public RecordReader<Text, BytesWritable> createRecordReader
(InputSplit split, TaskAttemptContext context) 
     throws IOException,InterruptedException 
     {
        return new WholeRecordReader();
     }

    @Override
    //判斷是否分片,false表示不分片,true表示分片。 
    //其實這個不寫也可以,因為在WholeRecordReader中一次性全部讀完
     protected boolean isSplitable(JobContext context,Path file)
     {
         return false;
     }
}

    下面是WholeRecordReader類:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeRecordReader extends RecordReader<Text,BytesWritable>
{
     //Hadoop中處理文件的類
     private FileSplit fileSplit;
     private FSDataInputStream in = null;
 
     private BytesWritable value = null;
     private Text key = null;
     
     //用于判斷文件是否讀取完成
     //也就是因為這個,所以WholeInputFormat中的isSplitable方法可以不用寫
     private boolean processed = false;
 
     @Override
     public void close() throws IOException 
     {
        //do nothing
     }

     @Override
     public Text getCurrentKey() throws IOException, InterruptedException 
     {
          return this.key;
     }
 
     @Override
     public BytesWritable getCurrentValue() throws IOException,InterruptedException 
     {
          return this.value;
     }
 
     @Override
     public float getProgress() throws IOException, InterruptedException 
     {
          return processed ? fileSplit.getLength() : 0;
     }
  
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException 
     {
          //打開一個文件輸入流
          fileSplit = (FileSplit)split;
          Configuration job = context.getConfiguration();
          Path file = fileSplit.getPath();
          FileSystem temp = file.getFileSystem(job);
          in = temp.open(file);
     }

     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException
     {
          if(key == null)
          {
              key = new Text();
          }
  
          if(value == null)
          {
              value = new BytesWritable();
          }
  
          if(!processed)
          {
              //申請一個字節(jié)數(shù)組保存將從文件中讀取的內(nèi)容
              byte[] content = new byte[(int)fileSplit.getLength()];
              Path file = fileSplit.getPath();
              //以文件的名字作為傳遞給Map函數(shù)的key值,可以自行設置
              key.set(file.getName());
        
              try{
               //讀取文件中的內(nèi)容
               IOUtils.readFully(in,content,0,content.length);
               //將value的值設置為byte[]中的值
               value.set(new BytesWritable(content));
              }catch(IOException e)
              {
                   e.printStackTrace();
              }finally{
               //關閉輸入流
               IOUtils.closeStream(in);
              }
        
              //將processed設置成true,表示讀取文件完成,以后不再讀取
              processed = true;
              return true;
          }
         
         return false;
     }
}

    當把這些寫好后,在main()函數(shù)或者run()函數(shù)里面將job的輸入格式設置成WholeInputFormat,如下:

job.setInputFormatClass(WholeInputFormat.class);

    現(xiàn)在,可以整個文件讀取了。其中,key,value的類型可以換成大家需要的類型。不過,當在Hadoop中找不到對應類型的時候建議用BytesWritable類型,然后用byte[]作為中間類型轉化為java可以處理的類型。

感謝各位的閱讀,以上就是“Hadoop的整文件讀取方法”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對Hadoop的整文件讀取方法這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關知識點的文章,歡迎關注!

分享標題:Hadoop的整文件讀取方法
本文URL:http://muchs.cn/article28/ipipcp.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈面包屑導航、網(wǎng)頁設計公司網(wǎng)站收錄、品牌網(wǎng)站建設、定制開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設