Spark中Join的用法

這篇文章主要介紹“Spark中Join的用法”,在日常操作中,相信很多人在Spark中Join的用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark中Join的用法”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

專注于為中小企業(yè)提供網(wǎng)站制作、成都網(wǎng)站設(shè)計服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)滴道免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千余家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。

在數(shù)據(jù)分析和處理的過程中,我們經(jīng)常會用Join操作來關(guān)聯(lián)兩個數(shù)據(jù)集,Spark作為一個通用的分析引擎,能夠支持多種Join的應(yīng)用場景。

Join操作的輸入是兩個數(shù)據(jù)集,A和B,將數(shù)據(jù)集A中的每一條記錄和數(shù)據(jù)集B中的每一條記錄進(jìn)行比對,每發(fā)現(xiàn)一條符合條件的記錄時,返回一條新的記錄,新記錄中的字段可以只從A中來,也可以只從B中來,也可以分別從A和B中取一部分,因此,Join后的記錄可以表示兩個數(shù)據(jù)集中記錄的結(jié)合。

影響Spark Join操作的三個因素

具體到Spark中Join操作的執(zhí)行,有三個影響較大的因素:輸入數(shù)據(jù)集的大小、Join條件、Join類型。

輸入數(shù)據(jù)集的大小

輸入數(shù)據(jù)集的大小直接影響Join操作的效率和可靠性,不只絕對大小,數(shù)據(jù)集之間的相對大小也對效率和可靠性有影響。

Join條件

Join條件通常是兩個數(shù)據(jù)集中字段的邏輯比較,一般可以分為等值Join不等值Join。

等值Join可以包含一個相等條件或多個需要同時滿足的相等條件,比如:

  • 一個相等條件:A.x == B.x

  • 多個相等條件:A.x == B.x and A.y == B.y

注:x 和 y 是數(shù)據(jù)集A和B中的字段。

不等值Join使用不相等條件或者不能同時滿足的相等條件,比如:

  • 不相等條件:A.x < B.x

  • 不能同時滿足的相等條件:A.x == B.x or A.y == B.y

Join類型

Join類型影響Join操作的輸出,大致包括以下幾類:

  • Inner Join:Inner Join只輸出匹配的記錄(滿足Join條件),記錄來自兩個數(shù)據(jù)集

  • Outer Join:Outer Join除了輸出匹配的記錄,也輸出未匹配的記錄,根據(jù)如何輸出未匹配的記錄,outer Join可以進(jìn)一步分為left out join、right out join和full outer join,記錄來自兩個數(shù)據(jù)集

  • Semi Join:Semi Join輸出的記錄只來自一個數(shù)據(jù)集,要么是匹配的記錄,要么是未匹配的記錄。如果輸出的是未匹配的記錄,也叫做Anti Join

  • Cross Join:Cross Join輸出兩個數(shù)據(jù)集中所有記錄可能的組合,例如,A集合中有m條記錄,B集合中有n條記錄,則結(jié)果為m*n條記錄,Cross Join又稱為笛卡爾積。

根據(jù)上面的三個因素,Spark會選擇合適的執(zhí)行機制來完成Join操作。

Spark Join的執(zhí)行機制

Spark提供了五種執(zhí)行Join操作的機制,分別是:

  • Shuffle Hash Join

  • Broadcast Hash Join

  • Sort Merge Join

  • Cartesian Join

  • Broadcast Nested Join

Hash Join

Broadcast Hash Join和Shuffle Hash Join都基于Hash Join,Hash Join是單機上的Join操作。想象一道LeetCode算法題,數(shù)據(jù)量分別為m和n的兩個數(shù)組,怎么找到兩個數(shù)組的公共元素?第一種方法:對兩個數(shù)組進(jìn)行嵌套循環(huán)的遍歷,發(fā)現(xiàn)相等元素則輸出。第二種方法:用空間換時間,將其中一個數(shù)組轉(zhuǎn)化成集合(Python的set或者Java的HashSet,實現(xiàn)都基于哈希表),然后遍歷第二個數(shù)組中的每一個元素,判斷是否包含在第一個集合中。Hash Join和第二種方法類似,將較小的數(shù)據(jù)集分區(qū)構(gòu)造成哈希表,用Join的key作為哈希表的key,key所對應(yīng)的記錄作為哈希表的value,然后遍歷較大的數(shù)據(jù)集分區(qū),在哈希表中尋找對應(yīng)的key,找到兩個分區(qū)key相同的記錄將其輸出。因為使用了哈希表,所以叫做Hash Join。

根據(jù)進(jìn)行Join的兩個數(shù)據(jù)集的大小關(guān)系,Spark支持兩種Hash Join。

Broadcast Hash Join

當(dāng)其中一個數(shù)據(jù)集足夠小時,采用Broadcast Hash Join,較小的數(shù)據(jù)集會被廣播到所有Spark的executor上,并轉(zhuǎn)化為一個Hash Table,之后較大數(shù)據(jù)集的各個分區(qū)會在各個executor上與Hash Table進(jìn)行本地的Join,各分區(qū)Join的結(jié)果合并為最終結(jié)果。

Broadcast Hash Join 沒有Shuffle階段、效率最高。但為了保證可靠性,executor必須有足夠的內(nèi)存能放得下被廣播的數(shù)據(jù)集,所以當(dāng)進(jìn)兩個數(shù)據(jù)集的大小都超過一個可配置的閾值之后,Spark不會采用這種Join??刂七@個閾值的參數(shù)為spark.sql.autoBroadcastJoinThreshold,最新版本(3.0.1)中默認(rèn)值為10M。

Shuffle Hash Join

當(dāng)兩個數(shù)據(jù)集都小于可以使用Broadcast Hash Join的閾值時,采用Shuffle Join,先對兩個數(shù)據(jù)集進(jìn)行Shuffle,Shuffle是意思是根據(jù)key的哈希值,對兩個數(shù)據(jù)集進(jìn)行重新分區(qū),使得兩個數(shù)據(jù)集中key的哈希值相同的記錄會被分配到同一個executor上,此時在每個executor上的分區(qū)都足夠小,各個executor分別執(zhí)行Hash Join即可。

Shuffle操作會帶來大量的網(wǎng)絡(luò)IO開銷,因此效率會受到影響。同時,在executor的內(nèi)存使用方面,如果executor的數(shù)量足夠多,每個分區(qū)處理的數(shù)據(jù)量可以控制到比較小。

Sort Merge Join

Sort Merge Join和Shuffle Hash Join類似,會有一個Shuffle階段,將key相同的記錄重分配同一個executor上,不同的是,在每個executor上,不再構(gòu)造哈希表,而是對兩個分區(qū)進(jìn)行排序,然后用兩個下標(biāo)同時遍歷兩個分區(qū),如果兩個下標(biāo)指向的記錄key相同,則輸出這兩條記錄,否則移動key較小的下標(biāo)。

Sort Merge Join也有Shuffle階段,因此效率同樣不如Broadcast Hash Join。在內(nèi)存使用方面,因為不需要構(gòu)造哈希表,需要的內(nèi)存比Hash Join要少。

Cartesian Join

Cartesian Join機制專門用來實現(xiàn)cross join,結(jié)果的分區(qū)數(shù)等于輸入數(shù)據(jù)集的分區(qū)數(shù)之積,結(jié)果中每一個分區(qū)的數(shù)據(jù)對應(yīng)一個輸入數(shù)據(jù)集的一個分區(qū)和另外一個輸入數(shù)據(jù)集的一個分區(qū)。

Cartesian Join會產(chǎn)生非常多的分區(qū),但如果要進(jìn)行cross join,別無選擇。

Broadcast Nested Loop Join

Broadcast Nested Join將一個輸入數(shù)據(jù)集廣播到每個executor上,然后在各個executor上,另一個數(shù)據(jù)集的分區(qū)會和第一個數(shù)據(jù)集使用嵌套循環(huán)的方式進(jìn)行Join輸出結(jié)果。

Broadcast Nested Join需要廣播數(shù)據(jù)集和嵌套循環(huán),計算效率極低,對內(nèi)存的需求也極大,因為不論數(shù)據(jù)集大小,都會有一個數(shù)據(jù)集被廣播到所有executor上。

Spark如何選擇Join機制

Spark根據(jù)以下的因素選擇實際執(zhí)行Join的機制:

  • 參數(shù)配置

  • hint參數(shù)

  • 輸入數(shù)據(jù)集大小

  • Join類型

  • Join條件

其中,hint參數(shù)是一種在join時手動指定join機制的方法,例如:

df1.hint("broadcast").join(df2, ...)

下面介紹在什么情況下使用何種Join機制。

何時使用Broadcast Hash Join

必需條件:

  • 只用于等值Join

  • 不能用于Full Outer Join

以下條件需要滿足一個:

  • 左邊的數(shù)據(jù)集使用了broadcast hint,Join類型是Right Outer,Right Semi或Inner

  • 沒使用hint,但左邊的數(shù)據(jù)集小于spark.sql.autoBroadcastJoinThreshold參數(shù),Join類型是Right Outer,Right Semi或Inner

  • 右邊的數(shù)據(jù)集使用了broadcast hint,Join類型是Left Outer,Left Semi或Inner

  • 沒使用hint,但右邊的數(shù)據(jù)集小于spark.sql.autoBroadcastJoinThreshold參數(shù),Join類型是Left Outer,Left Semi或Inner

  • 兩個數(shù)據(jù)集都使用了broadcast hint,Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner

  • 沒使用hint,但兩個數(shù)據(jù)集都小于spark.sql.autoBroadcastJoinThreshold參數(shù),Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner

何時使用Shuffle Hash Join

必需條件:

  • 只用于等值Join

  • 不能用于Full Outer Join

  • spark.sql.join.prefersortmergeJoin 參數(shù)默認(rèn)值為true,設(shè)置為false

以下條件需要滿足一個:

  • 左邊的數(shù)據(jù)集使用了shuffle_hash hint,Join類型是Right Outer,Right Semi或Inner

  • 沒使用hint,但左邊的數(shù)據(jù)集比右邊的數(shù)據(jù)集顯著小,Join類型是Right Outer,Right Semi或Inner

  • 右邊的數(shù)據(jù)集使用了shuffle_hash hint,Join類型是Left Outer,Left Semi或Inner

  • 沒使用hint,但右邊的數(shù)據(jù)集比左邊的數(shù)據(jù)集顯著小,Join類型是Left Outer,Left Semi或Inner

  • 兩邊的數(shù)據(jù)集都使用了shuffle_hash hint,Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner

  • 沒使用hint,兩個數(shù)據(jù)集都比較小,Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner

何時使用Sort Merge Join

必需條件:

  • 只用于等值Join

  • Join條件中的key是可排序的

  • spark.sql.join.prefersortmergeJoin 參數(shù)默認(rèn)值為true,設(shè)置為true

以下條件需要滿足一個:

  • 有一個數(shù)據(jù)集使用了merge hint,Join類型任意

  • 沒有使用merge hint,Join類型任意

何時使用Cartesian Join

必需條件:

  • Cross Join

以下條件需要滿足一個:

  • 使用了shuffle_replicate_nl hint,是等值或不等值Join均可

  • 沒有使用hint,等值或不等值Join均可

何時Broadcast Nested Loop Join

Broadcast Nested Loop Join是默認(rèn)的Join機制,當(dāng)沒有選用其他Join機制被選擇時,用它來進(jìn)行任意條件任意類型的Join。

當(dāng)有多種Join機制可用時,選擇的優(yōu)先級為Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > Cartesian Join。

在進(jìn)行Inner Join和不等值Join時,如果有一個數(shù)據(jù)集可以被廣播,Broadcast Nested Loop Join的優(yōu)先級比Cartesian Join優(yōu)先級高。

到此,關(guān)于“Spark中Join的用法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

新聞標(biāo)題:Spark中Join的用法
文章位置:http://muchs.cn/article0/ghcjio.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站、電子商務(wù)、軟件開發(fā)、服務(wù)器托管、品牌網(wǎng)站建設(shè)網(wǎng)站設(shè)計公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁設(shè)計公司