這篇文章主要講解了“Hadoop Job提交相關知識點分析”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Hadoop Job提交相關知識點分析”吧!
創(chuàng)新互聯(lián)是一家專注于網(wǎng)站制作、做網(wǎng)站與策劃設計,仁化網(wǎng)站建設哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設10余年,網(wǎng)設計領域的專業(yè)建站公司;建站業(yè)務涵蓋:仁化等地區(qū)。仁化做網(wǎng)站價格咨詢:18980820575
Configuration類是用來訪問hadoop的配置參數(shù)的。
Configuration類首先會通過靜態(tài)代碼段加載hadoop的配置文件core-default.xml和和core-site.xml,相關代碼如下:
<span >static{ //print deprecation warning if hadoop-site.xml is found in classpath ClassLoader cL = Thread.currentThread().getContextClassLoader(); if (cL == null) { cL = Configuration.class.getClassLoader(); } if(cL.getResource("hadoop-site.xml")!=null) { LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " + "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, " + "mapred-site.xml and hdfs-site.xml to override properties of " + "core-default.xml, mapred-default.xml and hdfs-default.xml " + "respectively"); } addDefaultResource("core-default.xml"); addDefaultResource("core-site.xml"); } </span>
defaultResources是一個ArrayList,用來保存默認的配置文件路徑。如果一個默認的配置文件路徑不在defaultResource里面,就添加進去,這個邏輯是在
addDefaultResource方法中實現(xiàn)的。
properties是一個Properties對象,保存從配置文件中解析出來的配置屬性,如果多個配置文件有相同的key,后者會覆蓋前者的值。
JobConf類用來配置Map/Reduce作業(yè)信息的,繼承自Configuration類。
JobConf類首先會通過靜態(tài)代碼段加載mapred-default.xml和mapred-site.xml配置屬性文件。
DEFAULT_MAPRED_TASK_JAVA_OPTS=“-Xmx200m”,默認情況下Map/Reduce任務的JAVA命令行選項指定的JAVA虛擬機最大內存是200M。
JobClient類是用戶與JobTracker交互的主要接口,通過它可以提交jobs,追蹤job的進度,訪問task組件的日志,查詢集群的狀態(tài)信息等。
提交job是通過runJob方法實現(xiàn)的,相關代碼如下:
<span >public static RunningJob runJob(JobConf job) throws IOException { JobClient jc = new JobClient(job); RunningJob rj = jc.submitJob(job); try { if (!jc.monitorAndPrintJob(job, rj)) { LOG.info("Job Failed: " + rj.getFailureInfo()); throw new IOException("Job failed!"); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } return rj; }</span>
首先創(chuàng)建一個JobClient對象,此對象在構造函數(shù)中會根據(jù)JobConf對象去連接JobTracker。
JobClient與JobTracker通信是通過jobSubmitClient操作的,jobSubmitClient是JobSubmissionProtocol類型的動態(tài)代理類,是通過如下方法產(chǎn)生的:
<span > private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, Configuration conf) throws IOException { return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, addr, UserGroupInformation.getCurrentUser(), conf, NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class)); }</span>
getProxy方法的關鍵是Invoker類,Invoker類實現(xiàn)了InvocationHandler接口,主要有兩個成員變量,remoteId是Client.ConnectionId類型,保存連接地址和用戶的ticket,客戶端連接服務器由<remoteAddress,protocol,ticket>唯一標識。從這里我們也可以看到一些配置屬性值,默認的rpcTimeout是0,
ipc.client.connection.maxidletime客戶端連接的最大空閑時間是10s,
ipc.client.connect.max.retries客戶端同服務器建立連接時的最大重試次數(shù)是10,
ipc.client.tcpnodelay是否開啟Nagle算法(對TCP/IP進行擁塞控制),如果開啟,會減少延遲,但是會增加小數(shù)據(jù)報,默認是false。client是Client類,用于IPC通信。client會通過ClientCache類來緩存,如果緩存中沒有,會新建一個Client,否則原client計數(shù)加1。Invoker類主要的方法是invoke方法,invoke方法的功能是調用client的方法然后返回結果。動態(tài)代理類代理的對象是Client對象。
submitJobInternal方法是真正用來提交job的,具體步驟如下:
1、初始化staging目錄,staging目錄根目錄是由mapreduce.jobtracker.staging.root.dir配置的,默認是/tmp/hadoop/mapred/staging,具體到某個用戶的staging目錄是$ROOT/userName/.staging。
2、從JobTracker那里取得新的job id,job id從1開始遞增。
3、獲得提交job的目錄submitJobDir=用戶的staging目錄/jobid,并且將這個目錄設置成mapreduce.job.dir的值。
4、copyAndConfigureFiles拷貝和初始化文件,首先從配置屬性mapred.submit.replication取得replication值,默認為10。然后判斷submitJobDir目錄是否存在,如果存在拋異常;否則創(chuàng)建submitJobDir目錄;取得job的分布式緩存文件路徑=submitJobDir/files;取得job的分布式緩存存檔路徑=submitJobDir/archives;取得job的分布式緩存libjars路徑=submitJobDir/libjars;如果命令行參數(shù)有tmpfiles,則將這些文件拷貝到分布式緩存文件路徑下,同時將這個路徑加入到分布式緩存中;如果命令行參數(shù)有tmpjars,則將這些文件拷貝到分布式緩存libjars路徑下,同時將這個路徑加入到分布式緩存中;如果命令行參數(shù)有tmparchives,則將這些文件拷貝到分布式緩存存檔路徑下,同時將這個路徑加入到分布式緩存中;根據(jù)mapred.jar屬性取得jar包的路徑,如果沒有指定job的名字,那么將使用jar包的名字作為job名字;取得job jar的存儲路徑=submitJobDir/job.jar;將用戶指定的jar包拷貝到job jar的存儲路徑;設置工作目錄,默認是配置屬性mapred.working.dir指定的值。
5、取得job配置文件的路徑submitJobFile=submitJobDir/job.xml;設置
mapreduce.job.submithostaddress為本機ip地址,設置
mapreduce.job.submithost為本機主機名。
6、為job創(chuàng)建輸入分區(qū),這是由writeSplits方法完成的。以old api為例,首先調用InputFormat的getSplits方法得到一個InputSplit分區(qū)數(shù)組,F(xiàn)ileInputFormat類的getSplits方法實現(xiàn)過程如下:
通過listStatus方法取得輸入文件路徑列表,過濾掉_和.開頭的路徑以及根據(jù)設置的mapred.input.pathFilter.class過濾;
在JobConf中設置mapreduce.input.num.files為輸入文件數(shù);
計算出所有輸入文件的總大小totalSize,目標分區(qū)大小goalSize=totalSize/numSplits(由mapred.map.tasks配置,默認為1),最小分區(qū)大小minSize=mapred.min.split.size配置和1之間的較大值,對于每一個輸入文件,如果這個文件的長度不等于0并且是可切分的,計算分區(qū)大小splitSize=Math.max(minSize,Math.min(goalSize,blockSize)),blockSize為HDFS存儲文件的塊大小,對于每一個分區(qū)大小,計算對其貢獻最大的主機數(shù)組(根據(jù)機架以及塊的字節(jié)大小確定),然后將這個分區(qū)加入到分區(qū)列表;然后根據(jù)分區(qū)大長度從大到小對分區(qū)列表進行排序;然后將分區(qū)列表寫入到分區(qū)文件,分區(qū)文件名=submitJobDir/job.split,分區(qū)文件的存儲格式:SPL字節(jié)信息,分區(qū)版本號,{InputSplit類名,InputSplit類信息}+;SplitMetaInfo數(shù)組記錄每個分區(qū)信息在文件中的偏移,主機信息和長度;將分區(qū)Meta信息SplitMetaInfo數(shù)組寫入到文件submitJobDir/job.splitmetainfo。
7、JobConf設置mapred.map.tasks為分區(qū)數(shù)。
8、根據(jù)mapred.job.queue.name獲得job提交的隊列的名字,默認是default,然后根據(jù)這個隊列名字獲得訪問控制列表。
9、將重新配置過的JobConf寫入到submitJobDir/job.xml文件。
10、將jobid,submitJobDir信息傳給JobTracker正式提交job,并通過NetworkedJob對象跟蹤job的狀態(tài)。
monitorAndPrintJob方法監(jiān)控job的運行并且實時打印job的狀態(tài)。
感謝各位的閱讀,以上就是“Hadoop Job提交相關知識點分析”的內容了,經(jīng)過本文的學習后,相信大家對Hadoop Job提交相關知識點分析這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關知識點的文章,歡迎關注!
本文題目:HadoopJob提交相關知識點分析
標題鏈接:http://muchs.cn/article18/jcjcdp.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、云服務器、標簽優(yōu)化、App設計、網(wǎng)站排名、網(wǎng)站營銷
聲明:本網(wǎng)站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)