DataSourceV2流處理方法是什么

本篇內(nèi)容介紹了“DataSourceV2流處理方法是什么”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

為永城等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計(jì)制作服務(wù),及永城網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為網(wǎng)站制作、成都網(wǎng)站建設(shè)、永城網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!

SparkSession結(jié)構(gòu)化流處理最后其實(shí)是通過DataSet的writeStream觸發(fā)執(zhí)行的。這點(diǎn)與傳統(tǒng)的spark sql方式是不一樣的。writeStream會找到StreamingQueryManager的startQuery方法,然后一步步到MicroBatchExecution和ContinuousExecution。

核心點(diǎn):MicroBatchExecution和ContinuousExecution里面會對StreamingRelationV2進(jìn)行轉(zhuǎn)換,轉(zhuǎn)換成StreamingDataSourceV2Relation。而MicroBatchExecution和ContinuousExecution只有在StreamingQueryManager的createQuery方法中才會被使用到。那么這個(gè)StreamingQueryManager的createQuery方法會在哪里被使用到呢?跟蹤代碼會發(fā)現(xiàn)是DataStreamWriter中調(diào)用StreamingQueryManager的startQuery方法進(jìn)而調(diào)用到createQuery方法的。

而DataStreamWriter是Dataset的writeStream創(chuàng)建的。

【以上說的是寫入流的過程】。

關(guān)鍵類:BaseSessionStateBuilder,里面有analyzer的定義。

protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) {

    override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =

      new FindDataSourceTable(session) +:

        new ResolveSQLOnFile(session) +:

        new FallBackFileSourceV2(session) +:

        DataSourceResolution(conf, this.catalogManager) +:

        customResolutionRules



    override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =

      new DetectAmbiguousSelfJoin(conf) +:

        PreprocessTableCreation(session) +:

        PreprocessTableInsertion(conf) +:

        DataSourceAnalysis(conf) +:

        customPostHocResolutionRules



    override val extendedCheckRules: Seq[LogicalPlan => Unit] =

      PreWriteCheck +:

        PreReadCheck +:

        HiveOnlyCheck +:

        TableCapabilityCheck +:

        customCheckRules

  }

這里沒有特別需要關(guān)注的,先忽略。

DataSourceV2是指spark中V2版本的結(jié)構(gòu)化流處理引擎框架。這里說的邏輯計(jì)劃就是StreamingDataSourceV2Relation,對應(yīng)的物理計(jì)劃分成兩類:MicroBatchScanExec和ContinuousScanExec,兩者的應(yīng)用場景從取名上就可以分辨出來,一個(gè)是微批處理模式;另一個(gè)則是連續(xù)流模式。

我們先從物理計(jì)劃開始解析。

這兩個(gè)物理計(jì)劃基于同一個(gè)父類:DataSourceV2ScanExecBase,先看看父類的代碼:

關(guān)鍵代碼:

override def doExecute(): RDD[InternalRow] = {

    val numOutputRows = longMetric("numOutputRows")

    inputRDD.map { r =>

      numOutputRows += 1

      r

    }

  }

子類需要重寫inputRDD。

StreamExecution

兩種重要的checkpoint屬性:

  val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))

  val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))

offsetLog是當(dāng)前讀取到哪個(gè)offset了,commitLog是當(dāng)前處理到哪個(gè)Offset了。這兩個(gè)Log非常重要,合在一起保證了Exactly-once語義。

MicroBatchScanExec

好了,先看看MicroBatchScanExec是怎么重寫inputRDD的。

override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start, end)


  override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()


  override lazy val inputRDD: RDD[InternalRow] = {

    new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)

  }

有三個(gè)地方,第一個(gè)是重寫Seq[InputPartition],調(diào)用stream的planInputPartitions方法,注意下這里的stream類型是MicroBatchStream;第二個(gè)是重寫readerFactory,獲得讀取器工廠類;第三個(gè)重寫是inputRDD,創(chuàng)建DataSourceRDD作為inputRDD,而前兩步重寫的Seq[InputPartition]和readerFactory作為DataSourceRDD的構(gòu)造參數(shù)。

這里首先大概看下DataSourceRDD的功能是什么。

DataSourceRDD這個(gè)類的代碼很短,很容易看清楚。最重要的就是compute方法,先給出全部代碼: 

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {

    val inputPartition = castPartition(split).inputPartition

    val reader: PartitionReader[_] = if (columnarReads) {

      partitionReaderFactory.createColumnarReader(inputPartition)

    } else {

      partitionReaderFactory.createReader(inputPartition)

    }


    context.addTaskCompletionListener[Unit](_ => reader.close())

    val iter = new Iterator[Any] {

      private[this] var valuePrepared = false

      override def hasNext: Boolean = {

        if (!valuePrepared) {

          valuePrepared = reader.next()

        }

        valuePrepared

      }


      override def next(): Any = {

        if (!hasNext) {

          throw new java.util.NoSuchElementException("End of stream")

        }

        valuePrepared = false

        reader.get()

      }

    }

    // TODO: SPARK-25083 remove the type erasure hack in data source scan

    new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])

  }

先根據(jù)讀取器工廠類創(chuàng)建一個(gè)PartitionReader,然后調(diào)用PartitionReader的get方法獲取數(shù)據(jù)。就是這么簡單了!

ContinuousScanExec

最后再看下ContinuousScanExec的定義。

override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start)

  override lazy val readerFactory: ContinuousPartitionReaderFactory = {

    stream.createContinuousReaderFactory()

  }


  override lazy val inputRDD: RDD[InternalRow] = {

    EpochCoordinatorRef.get(

      sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),

      sparkContext.env)

      .askSync[Unit](SetReaderPartitions(partitions.size))

    new ContinuousDataSourceRDD(

      sparkContext,

      sqlContext.conf.continuousStreamingExecutorQueueSize,

      sqlContext.conf.continuousStreamingExecutorPollIntervalMs,

      partitions,

      schema,

      readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])

  }

和微批處理模式MicroBatchScanExec類似,也有三個(gè)地方重寫,第一個(gè)是重寫Seq[InputPartition],調(diào)用stream的planInputPartitions方法,注意下這里的stream類型是ContinuousStream;第二個(gè)是重寫readerFactory,獲得讀取器工廠類ContinuousPartitionReaderFactory;第三個(gè)重寫是inputRDD,創(chuàng)建ContinuousDataSourceRDD作為inputRDD,而前兩步重寫的Seq[InputPartition]和readerFactory作為ContinuousDataSourceRDD的構(gòu)造參數(shù)。

這里首先大概看下ContinuousDataSourceRDD的功能是什么。

ContinuousDataSourceRDD的代碼和DataSourceRDD的基本差不多,直接看源碼吧,這里就不細(xì)說了,也沒啥好細(xì)說的,顯得啰里啰唆。

對于Kafka來說,ContinuousDataSourceRDD和DataSourceRDD其實(shí)最終是一樣的

“DataSourceV2流處理方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

網(wǎng)頁題目:DataSourceV2流處理方法是什么
網(wǎng)站鏈接:http://muchs.cn/article22/gdcejc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供手機(jī)網(wǎng)站建設(shè)網(wǎng)站策劃、網(wǎng)站營銷、靜態(tài)網(wǎng)站企業(yè)網(wǎng)站制作定制網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

小程序開發(fā)