Spark的JOIN策略有哪些

這篇文章主要介紹“Spark的JOIN策略有哪些”,在日常操作中,相信很多人在Spark的JOIN策略有哪些問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Spark的JOIN策略有哪些”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

創(chuàng)新互聯(lián)專(zhuān)注于企業(yè)營(yíng)銷(xiāo)型網(wǎng)站建設(shè)、網(wǎng)站重做改版、臨邑網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5響應(yīng)式網(wǎng)站、商城網(wǎng)站開(kāi)發(fā)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁(yè)設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性?xún)r(jià)比高,為臨邑等各大城市提供網(wǎng)站開(kāi)發(fā)制作服務(wù)。

JOIN操作是非常常見(jiàn)的數(shù)據(jù)處理操作,Spark作為一個(gè)統(tǒng)一的大數(shù)據(jù)處理引擎,提供了非常豐富的JOIN場(chǎng)景。

影響JOIN操作的因素 

數(shù)據(jù)集的大小

參與JOIN的數(shù)據(jù)集的大小會(huì)直接影響Join操作的執(zhí)行效率。同樣,也會(huì)影響JOIN機(jī)制的選擇和JOIN的執(zhí)行效率。 

JOIN的條件

JOIN的條件會(huì)涉及字段之間的邏輯比較。根據(jù)JOIN的條件,JOIN可分為兩大類(lèi):等值連接非等值連接。等值連接會(huì)涉及一個(gè)或多個(gè)需要同時(shí)滿(mǎn)足的相等條件。在兩個(gè)輸入數(shù)據(jù)集的屬性之間應(yīng)用每個(gè)等值條件。當(dāng)使用其他運(yùn)算符(運(yùn)算連接符不為=)時(shí),稱(chēng)之為非等值連接。 

JOIN的類(lèi)型

在輸入數(shù)據(jù)集的記錄之間應(yīng)用連接條件之后,JOIN類(lèi)型會(huì)影響JOIN操作的結(jié)果。主要有以下幾種JOIN類(lèi)型:

  • 內(nèi)連接(     Inner Join):僅從輸入數(shù)據(jù)集中輸出匹配連接條件的記錄。
  • 外連接(     Outer Join):又分為左外連接、右外鏈接和全外連接。
  • 半連接(     Semi Join):右表只用于過(guò)濾左表的數(shù)據(jù)而不出現(xiàn)在結(jié)果集中。
  • 交叉連接(     Cross Join):交叉接返回左表中的所有行,左表中的每一行與右表中的所有行組合。交叉聯(lián)接也稱(chēng)作笛卡爾積。 

Spark中JOIN執(zhí)行的5種策略

Spark提供了5種JOIN策略來(lái)執(zhí)行具體的JOIN操作。該5種JOIN策略如下所示:

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join
 

Shuffle Hash Join 

簡(jiǎn)介

當(dāng)要JOIN的表數(shù)據(jù)量比較大時(shí),可以選擇Shuffle Hash Join。這樣可以將大表進(jìn)行按照J(rèn)OIN的key進(jìn)行重分區(qū),保證每個(gè)相同的JOIN key都發(fā)送到同一個(gè)分區(qū)中。如下圖示:

Spark的JOIN策略有哪些

如上圖所示:Shuffle Hash Join的基本步驟主要有以下兩點(diǎn):

  • 首先,對(duì)于兩張參與JOIN的表,分別按照join key進(jìn)行重分區(qū),該過(guò)程會(huì)涉及Shuffle,其目的是將相同join key的數(shù)據(jù)發(fā)送到同一個(gè)分區(qū),方便分區(qū)內(nèi)進(jìn)行join。
  • 其次,對(duì)于每個(gè)Shuffle之后的分區(qū),會(huì)將小表的分區(qū)數(shù)據(jù)構(gòu)建成一個(gè)Hash table,然后根據(jù)join key與大表的分區(qū)數(shù)據(jù)記錄進(jìn)行匹配。
 
條件與特點(diǎn)
  • 僅支持等值連接,join key不需要排序
  • 支持除了全外連接(full outer joins)之外的所有join類(lèi)型
  • 需要對(duì)小表構(gòu)建Hash map,屬于內(nèi)存密集型的操作,如果構(gòu)建Hash表的一側(cè)數(shù)據(jù)比較大,可能會(huì)造成OOM
  • 將參數(shù)     spark.sql.join.prefersortmergeJoin置為false (默認(rèn)為 true)
 

Broadcast Hash Join 

簡(jiǎn)介

也稱(chēng)之為Map端JOIN。當(dāng)有一張表較小時(shí),我們通常選擇Broadcast Hash Join,這樣可以避免Shuffle帶來(lái)的開(kāi)銷(xiāo),從而提高性能。比如事實(shí)表與維表進(jìn)行JOIN時(shí),由于維表的數(shù)據(jù)通常會(huì)很小,所以可以使用Broadcast Hash Join將維表進(jìn)行Broadcast。這樣可以避免數(shù)據(jù)的Shuffle(在Spark中Shuffle操作是很耗時(shí)的),從而提高JOIN的效率。在進(jìn)行 Broadcast Join 之前,Spark 需要把處于 Executor 端的數(shù)據(jù)先發(fā)送到 Driver 端,然后 Driver 端再把數(shù)據(jù)廣播到 Executor 端。如果我們需要廣播的數(shù)據(jù)比較多,會(huì)造成 Driver 端出現(xiàn) OOM。具體如下圖示:

Spark的JOIN策略有哪些

Broadcast Hash Join主要包括兩個(gè)階段:

  • Broadcast階段 :小表被緩存在executor中
  • Hash Join階段:在每個(gè) executor中執(zhí)行Hash Join
 
條件與特點(diǎn)
  • 僅支持等值連接,join key不需要排序
  • 支持除了全外連接(full outer joins)之外的所有join類(lèi)型
  • Broadcast Hash Join相比其他的JOIN機(jī)制而言,效率更高。但是,Broadcast Hash Join屬于網(wǎng)絡(luò)密集型的操作(數(shù)據(jù)冗余傳輸),除此之外,需要在Driver端緩存數(shù)據(jù),所以當(dāng)小表的數(shù)據(jù)量較大時(shí),會(huì)出現(xiàn)OOM的情況
  • 被廣播的小表的數(shù)據(jù)量要小于     spark.sql.autoBroadcastJoinThreshold值,默認(rèn)是10MB(10485760)
  • 被廣播表的大小閾值不能超過(guò)8GB,spark2.4源碼如下:     BroadcastExchangeExec.scala
longMetric("dataSize") += dataSize
          if (dataSize >= (8L << 30)) {
            throw new SparkException(
              s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
          }
 
  • 基表不能被broadcast,比如左連接時(shí),只能將右表進(jìn)行廣播。形如:fact_table.join(     broadcast(dimension_table),可以不使用     broadcast提示,當(dāng)滿(mǎn)足條件時(shí)會(huì)自動(dòng)轉(zhuǎn)為該JOIN方式。
 

Sort Merge Join 

簡(jiǎn)介

該JOIN機(jī)制是Spark默認(rèn)的,可以通過(guò)參數(shù)spark.sql.join.preferSortMergeJoin進(jìn)行配置,默認(rèn)是true,即優(yōu)先使用Sort Merge Join。一般在兩張大表進(jìn)行JOIN時(shí),使用該方式。Sort Merge Join可以減少集群中的數(shù)據(jù)傳輸,該方式不會(huì)先加載所有數(shù)據(jù)的到內(nèi)存,然后進(jìn)行hashjoin,但是在JOIN之前需要對(duì)join key進(jìn)行排序。具體圖示:

Spark的JOIN策略有哪些

Sort Merge Join主要包括三個(gè)階段:

  • Shuffle Phase: 兩張大表根據(jù)Join key進(jìn)行Shuffle重分區(qū)
  • Sort Phase: 每個(gè)分區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序
  • Merge Phase: 對(duì)來(lái)自不同表的排序好的分區(qū)數(shù)據(jù)進(jìn)行JOIN,通過(guò)遍歷元素,連接具有相同Join key值的行來(lái)合并數(shù)據(jù)集
 
條件與特點(diǎn)
  • 僅支持等值連接
  • 支持所有join類(lèi)型
  • Join Keys是排序的
  • 參數(shù)     spark.sql.join.prefersortmergeJoin (默認(rèn)true)設(shè)定為true
 

Cartesian Join 

簡(jiǎn)介

如果 Spark 中兩張參與 Join 的表沒(méi)指定join key(ON 條件)那么會(huì)產(chǎn)生 Cartesian product join,這個(gè) Join 得到的結(jié)果其實(shí)就是兩張行數(shù)的乘積。

 
條件
  • 僅支持內(nèi)連接
  • 支持等值和不等值連接
  • 開(kāi)啟參數(shù)spark.sql.crossJoin.enabled=true
 

Broadcast Nested Loop Join 

簡(jiǎn)介

該方式是在沒(méi)有合適的JOIN機(jī)制可供選擇時(shí),最終會(huì)選擇該種join策略。優(yōu)先級(jí)為:Broadcast Hash Join >  Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.

在Cartesian 與Broadcast Nested Loop Join之間,如果是內(nèi)連接,或者非等值連接,則優(yōu)先選擇Broadcast Nested Loop策略,當(dāng)時(shí)非等值連接并且一張表可以被廣播時(shí),會(huì)選擇Cartesian Join。 

條件與特點(diǎn)
  • 支持等值和非等值連接
  • 支持所有的JOIN類(lèi)型,主要優(yōu)化點(diǎn)如下:
    • 當(dāng)右外連接時(shí)要廣播左表
    • 當(dāng)左外連接時(shí)要廣播右表
    • 當(dāng)內(nèi)連接時(shí),要廣播左右兩張表
 

Spark是如何選擇JOIN策略的 

等值連接的情況 

有join提示(hints)的情況,按照下面的順序
  • 1.Broadcast Hint:     如果join類(lèi)型支持,則選擇broadcast hash join

  • 2.Sort merge hint:     如果join key是排序的,則選擇 sort-merge join

  • 3.shuffle hash hint:     如果join類(lèi)型支持, 選擇 shuffle hash join

  • 4.shuffle replicate NL hint:     如果是內(nèi)連接,選擇笛卡爾積方式
     
沒(méi)有join提示(hints)的情況,則逐個(gè)對(duì)照下面的規(guī)則
  • 1.如果join類(lèi)型支持,并且其中一張表能夠被廣播(spark.sql.autoBroadcastJoinThreshold值,默認(rèn)是10MB),則選擇 broadcast hash join


  • 2.如果參數(shù)spark.sql.join.preferSortMergeJoin設(shè)定為false,且一張表足夠小(可以構(gòu)建一個(gè)hash map) ,則選擇shuffle hash join


  • 3.如果join keys 是排序的,則選擇sort-merge join


  • 4.如果是內(nèi)連接,選擇 cartesian join


  • 5.如果可能會(huì)發(fā)生OOM或者沒(méi)有可以選擇的執(zhí)行策略,則最終選擇broadcast nested loop join

     

非等值連接情況 

有join提示(hints),按照下面的順序
  • 1.broadcast hint:

    選擇broadcast nested loop join.


  • 2.shuffle replicate NL hint: 如果是內(nèi)連接,則選擇cartesian product join

     
沒(méi)有join提示(hints),則逐個(gè)對(duì)照下面的規(guī)則
  • 1.如果一張表足夠小(可以被廣播),則選擇 broadcast nested loop join


  • 2.如果是內(nèi)連接,則選擇cartesian product join


  • 3.如果可能會(huì)發(fā)生OOM或者沒(méi)有可以選擇的執(zhí)行策略,則最終選擇broadcast nested loop join

     

join策略選擇的源碼片段

  object JoinSelection extends Strategy
    with PredicateHelper
    with JoinSelectionHelper {
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

      case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
        def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
          getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.BroadcastHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {
          getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.ShuffledHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createSortMergeJoin() = {
          if (RowOrdering.isOrderable(leftKeys)) {
            Some(Seq(joins.SortMergeJoinExec(
              leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right))))
          } else {
            None
          }
        }

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition)))
          } else {
            None
          }
        }

        def createJoinWithoutHint() = {
          createBroadcastHashJoin(false)
            .orElse {
              if (!conf.preferSortMergeJoin) {
                createShuffleHashJoin(false)
              } else {
                None
              }
            }
            .orElse(createSortMergeJoin())
            .orElse(createCartesianProduct())
            .getOrElse {
              val buildSide = getSmallerSide(left, right)
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
            }
        }

        createBroadcastHashJoin(true)
          .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
          .orElse(createShuffleHashJoin(true))
          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
          .getOrElse(createJoinWithoutHint())

    
          if (canBuildLeft(joinType)) BuildLeft else BuildRight
        }

        def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
          val maybeBuildSide = if (buildLeft && buildRight) {
            Some(desiredBuildSide)
          } else if (buildLeft) {
            Some(BuildLeft)
          } else if (buildRight) {
            Some(BuildRight)
          } else {
            None
          }

          maybeBuildSide.map { buildSide =>
            Seq(joins.BroadcastNestedLoopJoinExec(
              planLater(left), planLater(right), buildSide, joinType, condition))
          }
        }

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
          } else {
            None
          }
        }

        def createJoinWithoutHint() = {
          createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))
            .orElse(createCartesianProduct())
            .getOrElse {
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), desiredBuildSide, joinType, condition))
            }
        }

        createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
          .getOrElse(createJoinWithoutHint())
      case _ => Nil
    }
  }

到此,關(guān)于“Spark的JOIN策略有哪些”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

網(wǎng)站名稱(chēng):Spark的JOIN策略有哪些
瀏覽地址:http://muchs.cn/article44/jpisee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、響應(yīng)式網(wǎng)站、ChatGPT網(wǎng)站設(shè)計(jì)、網(wǎng)站制作全網(wǎng)營(yíng)銷(xiāo)推廣

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站建設(shè)