deltalake的merge操作以及性能調(diào)優(yōu)是怎樣的

delta lake的merge操作以及性能調(diào)優(yōu)是怎樣的,針對這個(gè)問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。

延慶網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián),延慶網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為延慶1000+提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站制作要多少錢,請找那個(gè)售后服務(wù)好的延慶做網(wǎng)站的公司定做!

鑒于merge操作的復(fù)雜性,下面主要對其進(jìn)行展開講解。

1.merge算子操作語法

merge操作的sql表達(dá)如下:

import io.delta.tables._import org.apache.spark.sql.functions._

DeltaTable.forPath(spark, "/data/events/")  .as("events")  .merge(    updatesDF.as("updates"),    "events.eventId = updates.eventId")  .whenMatched  .updateExpr(    Map("data" -> "updates.data"))  .whenNotMatched  .insertExpr(    Map(      "date" -> "updates.date",      "eventId" -> "updates.eventId",      "data" -> "updates.data"))  .execute()

merge 編碼操作還是有些約束需要詳細(xì)描述的。

1.1 可以有(1,2,3)個(gè)wenMatched或者whenNotMatched的子語句。其中,whenMatched操作最多有兩個(gè)語句,whenNotMatched最多有一個(gè)子語句。

1.2 當(dāng)源表的數(shù)據(jù)和目標(biāo)表的數(shù)據(jù)滿足匹配條件的時(shí)候,執(zhí)行的是whenMatched語句。這些語句可以有以下幾個(gè)語義:

a) whenMatched語句最多有一個(gè)update和一個(gè)delete表達(dá)。merge中的update行為僅僅更新滿足條件的目標(biāo)表一行數(shù)據(jù)的指定列。而delete操作會刪除所有匹配的行。

b) 每個(gè)whenMatched語句都可以有一個(gè)可選的條件。如果該可選的條件存在,update和delete操作僅僅在該可選條件為true的時(shí)候,才會在匹配的目標(biāo)數(shù)據(jù)上執(zhí)行相應(yīng)操作。

c) 如果有兩個(gè)whenMatched子句,則將按照它們被指定的順序(即,子句的順序很重要)進(jìn)行執(zhí)行。第一個(gè)子句必須具有一個(gè)子句條件(否則,第二個(gè)子句將永遠(yuǎn)不會執(zhí)行)。

d) 如果兩個(gè)whenMatched子語句都有條件并且兩個(gè)子語句的條件都不為true,那不會對目標(biāo)數(shù)據(jù)進(jìn)行任何修改。

c) 支持滿足條件的源dataset中相關(guān)行的所有列同時(shí)更新到目標(biāo)detla表的相關(guān)列,表達(dá)式如下:

whenMatched(...).updateAll()

等價(jià)于:

whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

要保證源表和目標(biāo)表有相同的列,否則會拋出異常。

1.3 給定的條件,源表的一行數(shù)據(jù),跟目標(biāo)表沒有完成匹配的時(shí)候執(zhí)行whenNotMatched語句。該子語句有以下語法:

a) whenNotMatched僅僅支持insert表達(dá)。根據(jù)指定的列和相關(guān)的條件,該操作會在目標(biāo)表中插入一條新的數(shù)據(jù),當(dāng)目標(biāo)表中存在的列沒有明確的指定的時(shí)候,就插入null。

b) whenNotMatched語句可以有可選條件。如果指定了可選條件,數(shù)據(jù)僅僅會在可選條件為true的時(shí)候才會插入。否則,源列會被忽略。

c) 也可以插入匹配目標(biāo)表相關(guān)行的所有源表行的數(shù)據(jù)列,表達(dá)式:

whenNotMatched(...).insertAll()

等價(jià)于:

whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

要保證源表和目標(biāo)表有相同的列,否則就會拋出異常。

2.schema校驗(yàn)

merge操作會自動(dòng)校驗(yàn)insert和update操作產(chǎn)生額數(shù)據(jù)schema是否與目標(biāo)表的schema匹配。規(guī)則如下:

a) 對于update和insert行為,指定的目標(biāo)列必須在目標(biāo)delta lake表中存在。

b) 對于updateAll和insertAll操作,源dataset必須包含所有目標(biāo)表的列。源dataset可以有目標(biāo)表中不存在的列,但是這些列會被忽略。當(dāng)然也可以通過配置保留僅源dataset有的列。

c) 對于所有操作,如果由生成目標(biāo)列的表達(dá)式生成的數(shù)據(jù)類型與目標(biāo)Delta表中的對應(yīng)列不同,則merge嘗試將其強(qiáng)制轉(zhuǎn)換為表中的類型。

3.自動(dòng)schema轉(zhuǎn)換

默認(rèn)情況下,updateAll和insertAll操作僅僅會更新或插入在目標(biāo)表中有的相同列名的列,對于僅僅在源dataset中存在而目標(biāo)表中不存在的列,會被忽略。但是有些場景下,我們希望保留源dataset中新增的列。首先需要將前面介紹的一個(gè)參數(shù)spark.databricks.delta.schema.autoMerge.enabled設(shè)置為true。

注意:

a. schema自動(dòng)增加僅僅是針對updateAll操作或者insertAll操作,或者兩者。

b. 僅僅頂層的列會被更改,而不是嵌套的列。

c. 更新和插入操作不能顯式引用目標(biāo)表中不存在的目標(biāo)列(即使其中有updateAll或insertAll作為子句之一)。 

4.schema推斷與否對比

據(jù)一些例子,進(jìn)行schema自動(dòng)推斷與不自動(dòng)推斷的對比

對比一

目標(biāo)列(key,value),源列(key,value,newValue),對源源表執(zhí)行下面的sql操作:

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().updateAll()  .whenNotMatched().insertAll()  .execute()

沒有使用自動(dòng)schema推斷的話:目標(biāo)表的schema信息是不會變的。僅僅key,value列被更新。

使用了schema推斷的話:表的schema就會演變?yōu)?key,value,newValue)。updateAll操作,會更新value和newValue列。對于insertAll操作會插入整行(key,value,newValue)。

對比二

目標(biāo)表(key,oldValue),源表(key,newValue),對源表執(zhí)行下面的sql:

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().updateAll()  .whenNotMatched().insertAll()  .execute()

不使用schema推斷:updateAll和insertAll操作都會拋異常。

使用schema推斷:表的shema會演變?yōu)?key,oldValue,newValue)。updateAll操作會更新key和value列,而oldValue列不變。insertAll操作會插入(key,null,newValue),oldValue會插入null。

對比三

目標(biāo)表(key,oldValue),源表(key,newValue),對源表執(zhí)行下面的sql

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().update(Map(    "newValue" -> col("s.newValue")))  .whenNotMatched().insertAll()  .execute()

不使用schema推斷:update操作會拋出異常,因?yàn)閚ewValue在目標(biāo)表中并不存在。

使用schema推斷:update操作會拋出異常,因?yàn)閚ewValue在目標(biāo)表中并不存在。

對比四:

目標(biāo)表(key,oldValue),源表(key,newValue),對源表執(zhí)行下面的sql

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().updateAll()  .whenNotMatched().insert(Map(    "key" -> col("s.key"),    "newValue" -> col("s.newValue")))  .execute()

不使用schema推斷:insert操作會拋出異常,因?yàn)閚ewValue在目標(biāo)表中并不存在。

使用schema推斷:insert操作依然會拋出異常,因?yàn)閚ewValue在目標(biāo)表中并不存在。

5.性能調(diào)優(yōu)

下面幾個(gè)方法可以有效減少merge的處理時(shí)間:

a.減少匹配查找的數(shù)據(jù)量

默認(rèn)情況下,merge操作會掃描整個(gè)delta lake表找到滿足條件的數(shù)據(jù)。可以加些謂詞,以減少數(shù)據(jù)量。比如,數(shù)據(jù)是以country和date進(jìn)行分區(qū)的,而你只想更新特定國家的昨天的數(shù)據(jù)。就可以增加一些條件,比如:

events.date = current_date() AND events.country = 'USA'

這樣就只會處理指定分區(qū)的數(shù)據(jù),大大減少了數(shù)據(jù)掃描量。也可以避免不同分區(qū)之間操作的一些沖突。

b.合并文件

如果數(shù)據(jù)存儲的時(shí)候有很多小文件,就會降低數(shù)據(jù)的讀取速度??梢院喜⑿∥募梢恍┐笪募?,來提升讀取的速度。后面會說到這個(gè)問題。

c.控制shuffle的分區(qū)數(shù)

為了計(jì)算和更新數(shù)據(jù),merge操作會對數(shù)據(jù)進(jìn)行多次shuffle。shuffle過程中task數(shù)量是由參數(shù)spark.sql.shuffle.partitions來設(shè)置,默認(rèn)是200。該參數(shù)不僅能控制shuffle的并行度,也能決定輸出的文件數(shù)。增加這個(gè)值雖然可以增加并行度,但也相應(yīng)的增加了產(chǎn)生小文件數(shù)。

d.寫出數(shù)據(jù)之間進(jìn)行重分區(qū)

對與分區(qū)表,merge操作會產(chǎn)生很多小文件,會比shuffle分區(qū)數(shù)多很多。原因是每個(gè)shuffle任務(wù)會為多分區(qū)表產(chǎn)生更多的文件,這可能會是一個(gè)性能瓶頸。所以,很多場景中使用表的分區(qū)列對數(shù)據(jù)進(jìn)行寫入前重分區(qū)是很有效的??梢酝ㄟ^設(shè)置spark.delta.merge.repartitionBeforeWrite為true來生效。

關(guān)于delta lake的merge操作以及性能調(diào)優(yōu)是怎樣的問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識。

分享名稱:deltalake的merge操作以及性能調(diào)優(yōu)是怎樣的
本文鏈接:http://muchs.cn/article10/iheogo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營銷推廣、網(wǎng)站設(shè)計(jì)公司、網(wǎng)站內(nèi)鏈、商城網(wǎng)站、網(wǎng)站制作靜態(tài)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)