如何淺析Hive和SparkSQL讀文件時(shí)的輸入任務(wù)劃分

如何淺析Hive和Spark SQL讀文件時(shí)的輸入任務(wù)劃分,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。

成都創(chuàng)新互聯(lián)成立于2013年,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目做網(wǎng)站、成都做網(wǎng)站網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元祁門做網(wǎng)站,已為上家服務(wù),為祁門各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:13518219792

我們就來講解Hive和Spark SQL是如何切分輸入路徑的。

Hive

Hive是起步較早的SQL on Hadoop項(xiàng)目,最早也是誕生于Hadoop中,所以輸入劃分這部分的代碼與Hadoop相關(guān)度非常高?,F(xiàn)在Hive普遍使用的輸入格式是CombineHiveInputFormat,它繼承于HiveInputFormat,而HiveInputFormat實(shí)現(xiàn)了Hadoop的InputFormat接口,其中的getSplits方法用來獲取具體的劃分結(jié)果,劃分出的一份輸入數(shù)據(jù)被稱為一個(gè)“Split”。在執(zhí)行時(shí),每個(gè)Split對(duì)應(yīng)到一個(gè)map任務(wù)。在劃分Split時(shí),首先挑出不能合并到一起的目錄——比如開啟了事務(wù)功能的路徑。這些不能合并的目錄必須單獨(dú)處理,剩下的路徑交給私有方法getCombineSplits,這樣Hive的一個(gè)map task最多可以處理多個(gè)目錄下的文件。在實(shí)際操作中,我們一般只要通過set mapred.max.split.size=xx;即可控制文件合并的大小。當(dāng)一個(gè)文件過大時(shí),父類的getSplits也會(huì)幫我們完成相應(yīng)的切分工作。

Spark SQL

Spark的表有兩種:DataSource表和Hive表。另外Spark后續(xù)版本中DataSource V2也將逐漸流行,目前還在不斷發(fā)展中,暫時(shí)就不在這里討論。我們知道Spark SQL其實(shí)底層是Spark RDD,而RDD執(zhí)行時(shí),每個(gè)map task會(huì)處理RDD的一個(gè)Partition中的數(shù)據(jù)(注意這里的Partition是RDD的概念,要和表的Partition進(jìn)行區(qū)分)。因此,Spark SQL作業(yè)的任務(wù)切分關(guān)鍵在于底層RDD的partition如何切分。

Data Source表

Spark SQL的DataSource表在最終執(zhí)行的RDD類為FileScanRDD,由FileSourceScanExec創(chuàng)建出來。在創(chuàng)建這種RDD的時(shí)候,具體的Partition直接作為參數(shù)傳給了構(gòu)造函數(shù),因此劃分輸入的方法也在DataSourceScanExec.scala文件中。具體分兩步:首先把文件劃分為PartitionFile,再將較小的PartitionFile進(jìn)行合并。

第一步部分代碼如下:

  if (fsRelation.fileFormat.isSplitable(
    fsRelation.sparkSession, fsRelation.options, file.getPath)) {
    (0L until file.getLen by maxSplitBytes).map { offset =>val remaining = file.getLen - offsetval size = if (remaining > maxSplitBytes) maxSplitBytes else remainingval hosts = getBlockHosts(blockLocations, offset, size)PartitionedFile(
      partition.values, file.getPath.toUri.toString,
      offset, size, partitionDeleteDeltas, hosts)
    }
  } else {val hosts = getBlockHosts(blockLocations, 0, file.getLen)Seq(PartitionedFile(partition.values, file.getPath.toUri.toString,0, file.getLen, partitionDeleteDeltas, hosts))
  }

我們可以看出,Spark SQL首先根據(jù)文件類型判斷單個(gè)文件是否能夠切割,如果可以則按maxSplitBytes進(jìn)行切割。如果一個(gè)文件剩余部分無法填滿maxSplitBytes,也單獨(dú)作為一個(gè)Partition。

第二部分代碼如下所示:

  splitFiles.foreach { file =>if (currentSize + file.length > maxSplitBytes) {
      closePartition()
    }// Add the given file to the current partition.currentSize += file.length + openCostInBytes
    currentFiles += file
  }

這樣我們就可以依次遍歷第一步切好的塊,再按照maxSplitBytes進(jìn)行合并。注意合并文件時(shí)還需加上打開文件的預(yù)估代價(jià)openCostInBytes。那么maxSplitBytesopenCostInBytes這兩個(gè)關(guān)鍵參數(shù)怎么來的呢?

  val defaultMaxSplitBytes =
    fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes  val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes  val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum  val bytesPerCore = totalBytes / defaultParallelism  val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

不難看出,主要是spark.sql.files.maxPartitionBytes、spark.sql.files.openCostInBytes、調(diào)度器默認(rèn)并發(fā)度以及所有輸入文件實(shí)際大小所控制。

Hive表

Spark SQL中的Hive表底層的RDD類為HadoopRDD,由HadoopTableReader類實(shí)現(xiàn)。不過這次,具體的Partition劃分還是依賴HadoopRDDgetPartitions方法,具體實(shí)現(xiàn)如下:

  override def getPartitions: Array[Partition] = {
    ...try {      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }      val array = new Array[Partition](inputSplits.size)      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
      ...
    }
  }

不難看出,在處理Hive表的時(shí)候,Spark SQL把任務(wù)劃分又交給了Hadoop的InputFormat那一套。不過需要注意的是,并不是所有Hive表都?xì)w為這一類,Spark SQL會(huì)默認(rèn)對(duì)ORC和Parquet的表進(jìn)行轉(zhuǎn)化,用自己的Data Source實(shí)現(xiàn)OrcFileFormatParquetFileFormat來把這兩種表作為Data Source表來處理。

看完上述內(nèi)容,你們掌握如何淺析Hive和Spark SQL讀文件時(shí)的輸入任務(wù)劃分的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!

當(dāng)前名稱:如何淺析Hive和SparkSQL讀文件時(shí)的輸入任務(wù)劃分
網(wǎng)站路徑:http://muchs.cn/article16/ispigg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站策劃、網(wǎng)站制作、網(wǎng)站設(shè)計(jì)、微信公眾號(hào)、全網(wǎng)營(yíng)銷推廣、用戶體驗(yàn)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化