Giraph源碼分析(五)——加載數(shù)據(jù)+同步總結(jié)

作者|白松

古縣網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項目制作,到程序開發(fā),運(yùn)營維護(hù)。成都創(chuàng)新互聯(lián)從2013年成立到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運(yùn)維經(jīng)驗,來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)。

關(guān)于Giraph 共有九個章節(jié),本文第五個章節(jié)。

環(huán)境:在單機(jī)上(機(jī)器名:giraphx)啟動了2個workers。

輸入:SSSP文件夾,里面有1.txt和2.txt兩個文件。

1、在Worker向Master匯報健康狀況后,就開始等待Master創(chuàng)建InputSplit。

方法:每個Worker通過檢某個Znode節(jié)點是否存在,同時在此Znode上設(shè)置Watcher。若不存在,就通過BSPEvent的waitForever()方法釋放當(dāng)前線程的鎖,陷入等待狀態(tài)。一直等到master創(chuàng)建該znode。此步驟位于BSPServiceWorker類中的startSuperStep方法中,等待代碼如下:

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)cdn.xitu.io/2019/8/8/16c6f1c19ae23057?w=558&h=454&f=png&s=237620">
2、Master調(diào)用createInputSplits()方法創(chuàng)建InputSplit。

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

在generateInputSplits()方法中,根據(jù)用戶設(shè)定的VertexInputFormat獲得InputSplits。代碼如下:

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

其中minSplitCountHint為創(chuàng)建split的最小數(shù)目,其值如下:

minSplitCountHint = Workers數(shù)目 * NUM_INPUT_THREADS

NUM_INPUT_THREADS表示 每個Input split loading的線程數(shù)目,默認(rèn)值為1 。 經(jīng)查證,在TextVertexValueInputFormat抽象類中的getSplits()方法中的minSplitCountHint參數(shù)被忽略。用戶輸入的VertexInputFormat繼承TextVertexValueInputFormat抽象類。

如果得到的splits.size小于minSplitCountHint,那么有些worker就沒被用上。

得到split信息后,要把這些信息寫到Zookeeper上,以便其他workers訪問。上面得到的split信息如下:

[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]

遍歷splits List,為每個split創(chuàng)建一個Znode,值為split的信息。如為split-0創(chuàng)建Znode,值為:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0

為split-1創(chuàng)建znode(如下),值為:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1

最后創(chuàng)建znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有splits都創(chuàng)建好了。

3、Master根據(jù)splits創(chuàng)建Partitions。首先確定partition的數(shù)目。

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>對象默認(rèn)為HashMasterPartitioner。它的createInitialPartitionOwners()方法如下:

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

上面代碼中是在工具類PartitionUtils計算Partition的數(shù)目,計算公式如下:

partitionCount=PARTITION_COUNT_MULTIPLIER availableWorkerInfos.size() availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,默認(rèn)值為1 。

可見,partitionCount值為4(122)。創(chuàng)建的partitionOwnerList信息如下:

[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),

(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]

4、Master創(chuàng)建Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir,用于后面的exchange partition。

5、Master最后在assignPartitionOwners()方法中

把masterinfo,chosenWorkerInfoList,partitionOwners等信息寫入Znode中(作為Znode的data),該Znode的路徑為: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。

Master調(diào)用barrierOnWorkerList()方法開始等待各個Worker完成數(shù)據(jù)加載。調(diào)用關(guān)系如下:

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

barrierOnWorkerList中創(chuàng)建znode,path=/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir 。然后檢查該znode的子節(jié)點數(shù)目是否等于workers的數(shù)目,若不等于,則線程陷入等待狀態(tài)。后面某個worker完成數(shù)據(jù)加載后,會創(chuàng)建子node(如 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1)來激活該線程繼續(xù)判斷。

6、當(dāng)Master創(chuàng)建第5步的znode后,會激活worker。

每個worker從znode上讀出data,data包含masterInfo,WorkerInfoList和partitionOwnerList,然后各個worker開始加載數(shù)據(jù)。

把partitionOwnerList復(fù)制給BSPServiceWorker類中的workerGraphPartitioner(默認(rèn)為HashWorkerPartitioner類型)對象的partitionOwnerList變量,后續(xù)每個頂點把根據(jù)vertexID通過workerGraphPartitioner對象獲取其對應(yīng)的partitionOwner。

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

每個Worker從znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir獲取子節(jié)點,得到inputSplitPathList,內(nèi)容如下:

[/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1,

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]

然后每個Worker創(chuàng)建N個InputsCallable線程讀取數(shù)據(jù)。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS默認(rèn)值為1,maxInputSplitThread=(InputSplitSize-1/maxWorkers +1

那么,默認(rèn)每個worker就是創(chuàng)建一個線程來加載數(shù)據(jù)。

在InputSplitsHandler類中的reserveInputSplit()方法中,每個worker都是遍歷inputSplitPathList,通過創(chuàng)建znode來保留(標(biāo)識要處理)的split。代碼及注釋如下:

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

當(dāng)用reserveInputSplit()方法獲取某個znode后,loadSplitsCallable類的loadInputSplit方法就開始通過該znode獲取其HDFS的路徑信息,然后讀入數(shù)據(jù)、重分布數(shù)據(jù)。

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

VertexInputSplitsCallable類的readInputSplit()方法如下:

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

7、每個worker加載完數(shù)據(jù)后,調(diào)用waitForOtherWorkers()方法等待其他workers都處理完split。

Giraph 源碼分析(五)—— 加載數(shù)據(jù)+同步總結(jié)

策略如下,每個worker在/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir目錄下創(chuàng)建子節(jié)點,后面追加自己的worker信息,如worker1、worker2創(chuàng)建的子節(jié)點分別如下:

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2

創(chuàng)建完后,然后等待master創(chuàng)建/_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone。

8、從第5步驟可知,若master發(fā)現(xiàn)/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir下的子節(jié)點數(shù)目等于workers的總數(shù)目,就會在coordinateInputSplits()方法中創(chuàng)建

_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone,告訴每個worker,所有的worker都處理完了split。

9、最后就是就行全局同步。

master創(chuàng)建znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir ,然后再調(diào)用barrierOnWorkerList方法檢查該znode的子節(jié)點數(shù)目是否等于workers的數(shù)目,若不等于,則線程陷入等待狀態(tài)。等待worker創(chuàng)建子節(jié)點來激活該線程繼續(xù)判斷。

每個worker獲取自身的Partition Stats,進(jìn)入finishSuperStep方法中,等待所有的Request都被處理完;把自身的Aggregator信息發(fā)送給master;創(chuàng)建子節(jié)點,如/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data為該worker的partitionStatsList和workerSentMessages統(tǒng)計量;

最后調(diào)用waitForOtherWorkers()方法等待master創(chuàng)建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節(jié)點。

master發(fā)現(xiàn)/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir的子節(jié)點數(shù)目等于workers數(shù)目后,根據(jù)/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir子節(jié)點上的data收集每個worker發(fā)送的aggregator信息,匯總為globalStats。

Master若發(fā)現(xiàn)全局信息中(1)所有頂點都voteHalt且沒有消息傳遞,或(2)達(dá)到最大迭代次數(shù) 時,設(shè)置 globalStats.setHaltComputation(true)。告訴works結(jié)束迭代。

master創(chuàng)建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節(jié)點,data為globalStats。告訴所有workers當(dāng)前超級步結(jié)束。

每個Worker檢測到master創(chuàng)建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節(jié)點后,讀出該znode的數(shù)據(jù),即全局的統(tǒng)計信息。然后決定是否繼續(xù)下一次迭代。

10、同步之后開始下一個超級步。

11、master和workers同步過程總結(jié)。

(1)master創(chuàng)建znode A,然后檢測A的子節(jié)點數(shù)目是否等于workers數(shù)目,不等于就陷入等待。某個worker創(chuàng)建一個子節(jié)點后,就會喚醒master進(jìn)行檢測一次。

(2)每個worker進(jìn)行自己的工作,完成后,創(chuàng)建A的子節(jié)點A1。然后等待master創(chuàng)建znode B。

(3)若master檢測到A的子節(jié)點數(shù)目等于workers的數(shù)目時,創(chuàng)建Znode B

(4)master創(chuàng)建B 節(jié)點后,會激活各個worker。同步結(jié)束,各個worker就可以開始下一個超步。

本質(zhì)是通過znode B來進(jìn)行全局同步的。

標(biāo)題名稱:Giraph源碼分析(五)——加載數(shù)據(jù)+同步總結(jié)
本文路徑:http://muchs.cn/article46/pgdohg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供動態(tài)網(wǎng)站移動網(wǎng)站建設(shè)、App設(shè)計、ChatGPT、自適應(yīng)網(wǎng)站、外貿(mào)網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

綿陽服務(wù)器托管