如何理解SparkStreaming的數(shù)據(jù)可靠性和一致性

如何理解Spark Streaming的數(shù)據(jù)可靠性和一致性,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

成都創(chuàng)新互聯(lián)專注于榕江網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供榕江營銷型網(wǎng)站建設(shè),榕江網(wǎng)站制作、榕江網(wǎng)頁設(shè)計、榕江網(wǎng)站官網(wǎng)定制、小程序定制開發(fā)服務(wù),打造榕江網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供榕江網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。

眼下大數(shù)據(jù)領(lǐng)域最熱門的詞匯之一便是流計算了,其中最耀眼的項目無疑是來自Spark社區(qū)的Spark Streaming項目,其從一誕生就受到廣泛關(guān)注并迅速發(fā)展,目前已有追趕并超越Storm的架勢。

對于流計算而言,毫無疑問最核心的特點是它的低時延能力,這主要是來自對數(shù)據(jù)不落磁盤就進行計算的內(nèi)部機制,但這也帶來了數(shù)據(jù)可靠性的問題,即有節(jié)點失效或者網(wǎng)絡(luò)異常時,如何在節(jié)點間進行合適的協(xié)商來進行重傳。更進一步的,若發(fā)生計劃外的數(shù)據(jù)重傳,怎么能保證沒有產(chǎn)生重復的數(shù)據(jù),所有數(shù)據(jù)都是精確一次的(Exact Once)?如果不解決這些問題,大數(shù)據(jù)的流計算將無法滿足大多數(shù)企業(yè)級可靠性要求而流于徒有虛名。

下面將重點分析Spark Streaming是如何設(shè)計可靠性機制并實現(xiàn)數(shù)據(jù)一致性的。

Driver HA

由于流計算系統(tǒng)是長期運行、數(shù)據(jù)不斷流入的,因此其Spark守護進程(Driver)的可靠性是至關(guān)重要的,它決定了Streaming程序能否一直正確地運行下去。

如何理解Spark Streaming的數(shù)據(jù)可靠性和一致性

圖一 Driver數(shù)據(jù)持久化

Driver實現(xiàn)HA的解決方案就是將元數(shù)據(jù)持久化,以便重啟后的狀態(tài)恢復。如圖一所示,Driver持久化的元數(shù)據(jù)包括:

  • Block元數(shù)據(jù)(圖一中的綠色箭頭):Receiver從網(wǎng)絡(luò)上接收到的數(shù)據(jù),組裝成Block后產(chǎn)生的Block元數(shù)據(jù);

  • Checkpoint數(shù)據(jù)(圖一中的橙色箭頭):包括配置項、DStream操作、未完成的Batch狀態(tài)、和生成的RDD數(shù)據(jù)等;

如何理解Spark Streaming的數(shù)據(jù)可靠性和一致性

圖二 Driver故障恢復

Driver失敗重啟后:

  • 恢復計算(圖二中的橙色箭頭):使用Checkpoint數(shù)據(jù)重啟driver,重新構(gòu)造上下文并重啟接收器。

  • 恢復元數(shù)據(jù)塊(圖二中的綠色箭頭):恢復Block元數(shù)據(jù)。

  • 恢復未完成的作業(yè)(圖二中的紅色箭頭):使用恢復出來的元數(shù)據(jù),再次產(chǎn)生RDD和對應的job,然后提交到Spark集群執(zhí)行。

通過如上的數(shù)據(jù)備份和恢復機制,Driver實現(xiàn)了故障后重啟、依然能恢復Streaming任務(wù)而不丟失數(shù)據(jù),因此提供了系統(tǒng)級的數(shù)據(jù)高可靠。

可靠的上下游IO系統(tǒng)

流計算主要通過網(wǎng)絡(luò)socket通信來實現(xiàn)與外部IO系統(tǒng)的數(shù)據(jù)交互。由于網(wǎng)絡(luò)通信的不可靠特點,發(fā)送端與接收端需要通過一定的協(xié)議來保證數(shù)據(jù)包的接收確認、和失敗重發(fā)機制。

不是所有的IO系統(tǒng)都支持重發(fā),這至少需要實現(xiàn)數(shù)據(jù)流的持久化,同時還要實現(xiàn)高吞吐和低時延。在Spark Streaming官方支持的data source里面,能同時滿足這些要求的只有Kafka,因此在最近的Spark Streaming release里面,也是把Kafka當成推薦的外部數(shù)據(jù)系統(tǒng)。

除了把Kafka當成輸入數(shù)據(jù)源(inbound data source)之外,通常也將其作為輸出數(shù)據(jù)源(outbound data source)。所有的實時系統(tǒng)都通過Kafka這個MQ來做數(shù)據(jù)的訂閱和分發(fā),從而實現(xiàn)流數(shù)據(jù)生產(chǎn)者和消費者的解耦。

一個典型的企業(yè)大數(shù)據(jù)中心數(shù)據(jù)流向視圖如下所示:

如何理解Spark Streaming的數(shù)據(jù)可靠性和一致性

圖三 企業(yè)大數(shù)據(jù)中心數(shù)據(jù)流向視圖

除了從源頭保證數(shù)據(jù)可重發(fā)之外,Kafka更是流數(shù)據(jù)Exact Once語義的重要保障。Kafka提供了一套低級API,使得client可以訪問topic數(shù)據(jù)流的同時也能訪問其元數(shù)據(jù)。Spark Streaming的每個接收任務(wù)可以從指定的Kafka topic、partition和offset去獲取數(shù)據(jù)流,各個任務(wù)的數(shù)據(jù)邊界很清晰,任務(wù)失敗后可以重新去接收這部分數(shù)據(jù)而不會產(chǎn)生“重疊的”數(shù)據(jù),因而保證了流數(shù)據(jù)“有且僅處理一次”。

可靠的接收器

在Spark 1.3版本之前,Spark Streaming是通過啟動專用的Receiver任務(wù)來完成從Kafka集群的數(shù)據(jù)流拉取。

Receiver任務(wù)啟動后,會使用Kafka的高級API來創(chuàng)建topicMessageStreams對象,并逐條讀取數(shù)據(jù)流緩存,每個batchInerval時刻到來時由JobGenerator提交生成一個spark計算任務(wù)。

由于Receiver任務(wù)存在宕機風險,因此Spark提供了一個高級的可靠接收器-ReliableKafkaReceiver類型來實現(xiàn)可靠的數(shù)據(jù)收取,它利用了Spark 1.2提供的WAL(Write Ahead Log)功能,把接收到的每一批數(shù)據(jù)持久化到磁盤后,更新topic-partition的offset信息,再去接收下一批Kafka數(shù)據(jù)。萬一Receiver失敗,重啟后還能從WAL里面恢復出已接收的數(shù)據(jù),從而避免了Receiver節(jié)點宕機造成的數(shù)據(jù)丟失(以下代碼刪除了細枝末節(jié)的邏輯):

class ReliableKafkaReceiver{  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null  override def onStart(): Unit = {    // Initialize the topic-partition / offset hash map.    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]    // Initialize the block generator for storing Kafka message.    blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)    messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(      topics.values.sum, "KafkaMessageHandler")    blockGenerator.start()    val topicMessageStreams = consumerConnector.createMessageStreams(      topics, keyDecoder, valueDecoder)    topicMessageStreams.values.foreach { streams =>      streams.foreach { stream =>        messageHandlerThreadPool.submit(new MessageHandler(stream))      }    }  }

啟用WAL后雖然Receiver的數(shù)據(jù)可靠性風險降低了,但卻由于磁盤持久化帶來的開銷,系統(tǒng)整體吞吐率會有明顯的下降。因此,在最新發(fā)布的Spark 1.3版本里,Spark Streaming增加了使用Direct API的方式來實現(xiàn)Kafka數(shù)據(jù)源的訪問。

引入了Direct API后,Spark Streaming不再啟動常駐的Receiver接收任務(wù),而是直接分配給每個Batch及RDD最新的topic partition offset。job啟動運行后Executor使用Kafka的simple consumer API去獲取那一段offset的數(shù)據(jù)。

這樣做的好處不僅避免了Receiver宕機帶來的數(shù)據(jù)可靠性風險,同時也由于避免使用ZooKeeper做offset跟蹤,而實現(xiàn)了數(shù)據(jù)的精確一次性(以下代碼刪除了細枝末節(jié)的邏輯):

class DirectKafkaInputDStream{  protected val kc = new KafkaCluster(kafkaParams)  protected var currentOffsets = fromOffsets  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))    val rdd = KafkaRDD[K, V, U, T, R](      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)    Some(rdd)  }

預寫日志 Write Ahead Log

Spark 1.2開始提供了預寫日志能力,用于Receiver數(shù)據(jù)及Driver元數(shù)據(jù)的持久化和故障恢復。WAL之所以能提供持久化能力,是因為它利用了可靠的HDFS做數(shù)據(jù)存儲。

Spark Streaming預寫日志機制的核心API包括:

  • 管理WAL文件的WriteAheadLogManager

  • 讀/寫WAL的WriteAheadLogWriter和WriteAheadLogReader

  • 基于WAL的RDD:WriteAheadLogBackedBlockRDD

  • 基于WAL的Partition:WriteAheadLogBackedBlockRDDPartition

以上核心API在數(shù)據(jù)接收和恢復階段的交互示意圖如圖四所示。

如何理解Spark Streaming的數(shù)據(jù)可靠性和一致性

圖四 基于WAL的數(shù)據(jù)接收和恢復示意圖

從WriteAheadLogWriter的源碼里可以清楚地看到,每次寫入一塊數(shù)據(jù)buffer到HDFS后都會調(diào)用flush方法去強制刷入磁盤,然后才去取下一塊數(shù)據(jù)。因此receiver接收的數(shù)據(jù)是可以保證持久化到磁盤了,因而做到了較好的數(shù)據(jù)可靠性。

private[streaming] class WriteAheadLogWriter{  private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)  def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {    data.rewind() // Rewind to ensure all data in the buffer is retrieved    val lengthToWrite = data.remaining()    val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)    stream.writeInt(lengthToWrite)    if (data.hasArray) {      stream.write(data.array())    } else {      while (data.hasRemaining) {        val array = new Array[Byte](data.remaining)        data.get(array)        stream.write(array)      }    }    flush()    nextOffset = stream.getPos()    segment  }

看完上述內(nèi)容,你們掌握如何理解Spark Streaming的數(shù)據(jù)可靠性和一致性的方法了嗎?如果還想學到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!

分享名稱:如何理解SparkStreaming的數(shù)據(jù)可靠性和一致性
當前路徑:http://muchs.cn/article42/ijdhec.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供關(guān)鍵詞優(yōu)化、網(wǎng)頁設(shè)計公司定制開發(fā)、外貿(mào)建站、網(wǎng)站建設(shè)、企業(yè)網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化