(版本定制)第13課:SparkStreaming源碼解讀之Driver容錯(cuò)安全性

本期內(nèi)容:
1. ReceiverBlockTracker容錯(cuò)安全性 
2. DStream和JobGenerator容錯(cuò)安全性

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)建站!專注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、微信小程序定制開(kāi)發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了恩陽(yáng)免費(fèi)建站歡迎大家使用!

一:容錯(cuò)安全性 
1. ReceivedBlockTracker負(fù)責(zé)管理Spark Streaming運(yùn)行程序的元數(shù)據(jù)。數(shù)據(jù)層面 
2. DStream和JobGenerator是作業(yè)調(diào)度的核心層面,也就是具體調(diào)度到什么程度了,從運(yùn)行的考慮的。DStream是邏輯層面。 
3. 作業(yè)生存層面,JobGenerator是Job調(diào)度層面,具體調(diào)度到什么程度了。從運(yùn)行的角度的。

談Driver容錯(cuò)你要考慮Driver中有那些需要維持狀態(tài)的運(yùn)行。 
1. ReceivedBlockTracker跟蹤了數(shù)據(jù),因此需要容錯(cuò)。通過(guò)WAL方式容錯(cuò)。 
2. DStreamGraph表達(dá)了依賴關(guān)系,恢復(fù)狀態(tài)的時(shí)候需要根據(jù)DStream恢復(fù)計(jì)算邏輯級(jí)別的依賴關(guān)系。通過(guò)checkpoint方式容錯(cuò)。 
3. JobGenerator表面你是怎么基于ReceiverBlockTracker中的數(shù)據(jù),以及DStream構(gòu)成的依賴關(guān)系不斷的產(chǎn)生Job的過(guò)程。你消費(fèi)了那些數(shù)據(jù),進(jìn)行到什么程度了。

總結(jié)如下:

(版本定制)第13課:Spark Streaming源碼解讀之Driver容錯(cuò)安全性

ReceivedBlockTracker: 
1. ReceivedBlockTracker會(huì)管理Spark Streaming運(yùn)行過(guò)程中所有的數(shù)據(jù)。并且把數(shù)據(jù)分配給需要的batches,所有的動(dòng)作都會(huì)被WAL寫入到Log中,Driver失敗的話,就可以根據(jù)歷史恢復(fù)tracker狀態(tài),在ReceivedBlockTracker創(chuàng)建的時(shí)候,使用checkpoint保存歷史目錄。

下面就從Receiver收到數(shù)據(jù)之后,怎么處理的開(kāi)始。 
2. ReceiverBlockTracker.addBlock源碼如下: 
Receiver接收到數(shù)據(jù),把元數(shù)據(jù)信息匯報(bào)上來(lái),然后通過(guò)ReceiverSupervisorImpl就將數(shù)據(jù)匯報(bào)上來(lái),就直接通過(guò)WAL進(jìn)行容錯(cuò). 
當(dāng)Receiver的管理者,ReceiverSupervisorImpl把元數(shù)據(jù)信息匯報(bào)給Driver的時(shí)候,正在處理是交給ReceiverBlockTracker. ReceiverBlockTracker將數(shù)據(jù)寫進(jìn)WAL文件中,然后才會(huì)寫進(jìn)內(nèi)存中,被當(dāng)前的Spark Streaming程序的調(diào)度器使用的,也就是JobGenerator使用的。JobGenerator不可能直接使用WAL。WAL的數(shù)據(jù)在磁盤中,這里JobGenerator使用的內(nèi)存中緩存的數(shù)據(jù)結(jié)構(gòu)

/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) //接收數(shù)據(jù)后,先進(jìn)行WAL
if (writeResult) {
      synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo //當(dāng)WAL成功后,將Block Info元數(shù)據(jù)信息加入到Block Queue中
      }
      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    } else {
      logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
    }
    writeResult
  } catch {
case NonFatal(e) =>
      logError(s"Error adding block $receivedBlockInfo", e)
false
}
}

Driver端接收到的數(shù)據(jù)保存在streamIdToUnallocatedBlockQueues中,具體結(jié)構(gòu)如下:

private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
allocateBlocksToBatch把接收到的數(shù)據(jù)分配給batch,根據(jù)streamId取出Block,由此就知道Spark Streaming處理數(shù)據(jù)的時(shí)候可以有不同數(shù)據(jù)來(lái)源
那到底什么是batchTime? 
batchTime是上一個(gè)Job分配完數(shù)據(jù)之后,開(kāi)始再接收到的數(shù)據(jù)的時(shí)間。
/**
 * Allocate all unallocated blocks to the given batch.
 * This event will get written to the write ahead log (if enabled).
 */
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) //根據(jù)StreamId獲取Block信息
    }.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime //這里有對(duì)batchTime進(jìn)行賦值,就是上一個(gè)job分配完數(shù)據(jù)后,開(kāi)始在接收到數(shù)據(jù)的時(shí)間
    } else {
      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    }
  } else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
  }
}

隨著時(shí)間的推移,會(huì)不斷產(chǎn)生RDD,這時(shí)就需要清理掉一些歷史數(shù)據(jù),可以通過(guò)cleanupOldBatches方法來(lái)清理歷史數(shù)據(jù)

/**
 * Clean up block information of old batches. If waitForCompletion is true, this method
 * returns only after the files are cleaned up.
 */
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
  logInfo("Deleting batches " + timesToCleanup)
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
  } else {
    logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
  }
}

以上幾個(gè)方法都進(jìn)行了WAL動(dòng)作

(record: ReceivedBlockTrackerLogEvent): = {
(isWriteAheadLogEnabled) {
    logTrace(record)
{
.get.write(ByteBuffer.(Utils.(record))clock.getTimeMillis())
} {
(e) =>
        logWarning(recorde)
}
  } {
}
}

總結(jié): 
WAL對(duì)數(shù)據(jù)的管理包括數(shù)據(jù)的生成,數(shù)據(jù)的銷毀和消費(fèi)。上述在操作之后都要先寫入到WAL的文件中. 

(版本定制)第13課:Spark Streaming源碼解讀之Driver容錯(cuò)安全性

JobGenerator: 
Checkpoint會(huì)有時(shí)間間隔Batch Duractions,Batch執(zhí)行前和執(zhí)行后都會(huì)進(jìn)行checkpoint。 
doCheckpoint被調(diào)用的前后流程: 
(版本定制)第13課:Spark Streaming源碼解讀之Driver容錯(cuò)安全性(版本定制)第13課:Spark Streaming源碼解讀之Driver容錯(cuò)安全性

1、簡(jiǎn)單看下generateJobs

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) //job完成后就需要進(jìn)行checkpoint動(dòng)作
}

2、processEvent接收到消息事件

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater) // 調(diào)用doCheckpoint方法
case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

3、doCheckpoint源碼如下:

/** Perform checkpoint for the give `time`. */
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
    logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time) //最終是進(jìn)行RDD的Checkpoint
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
  }
}

4、DStream中的updateCheckpointData源碼如下:最終導(dǎo)致RDD的Checkpoint

/**
 * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
 * this stream. This is an internal method that should not be called directly. This is
 * a default implementation that saves only the file names of the checkpointed RDDs to
 * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
 * this method to save custom checkpoint data.
 */
private[streaming] def updateCheckpointData(currentTime: Time) {
  logDebug("Updating checkpoint data for time " + currentTime)
checkpointData.update(currentTime)
  dependencies.foreach(_.updateCheckpointData(currentTime))
  logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
}

JobGenerator容錯(cuò)安全性如下圖: 
(版本定制)第13課:Spark Streaming源碼解讀之Driver容錯(cuò)安全性(版本定制)第13課:Spark Streaming源碼解讀之Driver容錯(cuò)安全性

參考博客:http://blog.csdn.net/snail_gesture/article/details/51492873#comments

分享題目:(版本定制)第13課:SparkStreaming源碼解讀之Driver容錯(cuò)安全性
標(biāo)題網(wǎng)址:http://muchs.cn/article36/iejosg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供電子商務(wù)、動(dòng)態(tài)網(wǎng)站網(wǎng)站設(shè)計(jì)、自適應(yīng)網(wǎng)站、關(guān)鍵詞優(yōu)化、云服務(wù)器

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

微信小程序開(kāi)發(fā)