deltalake的merge場景是怎么樣的,很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
創(chuàng)新互聯(lián)公司是一家業(yè)務(wù)范圍包括IDC托管業(yè)務(wù),虛擬空間、主機(jī)租用、主機(jī)托管,四川、重慶、廣東電信服務(wù)器租用,資陽主機(jī)托管,成都網(wǎng)通服務(wù)器托管,成都服務(wù)器租用,業(yè)務(wù)范圍遍及中國大陸、港澳臺以及歐美等多個(gè)國家及地區(qū)的互聯(lián)網(wǎng)數(shù)據(jù)服務(wù)公司。
下面主要是講merge操作的四個(gè)案例。
1.數(shù)據(jù)去重
實(shí)際上,線上業(yè)務(wù)很多時(shí)候數(shù)據(jù)源在上報(bào)數(shù)據(jù)的時(shí)候,由于各種原因可能會重復(fù)上報(bào)數(shù)據(jù),這就會導(dǎo)致數(shù)據(jù)重復(fù),使用merge函數(shù)可以避免插入重復(fù)的數(shù)據(jù)。具體操作方法如下:
sql
MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueIdWHEN NOT MATCHED THEN INSERT *
scala
deltaTable .as("logs") .merge( newDedupedLogs.as("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId") .whenNotMatched() .insertAll() .execute()
注意:需要寫入delta lake表的dataset自身要完成去重的 操作。我們可以通過merge語義區(qū)實(shí)現(xiàn)新數(shù)據(jù)和delta lake表中已有的數(shù)據(jù)之間去重,但是如果新的dataset內(nèi)部有重復(fù)數(shù)據(jù),重復(fù)數(shù)據(jù)依然會被插入。因此在寫入新數(shù)據(jù)之前一定要完成去重操作。
如果數(shù)據(jù)確定可能會在某些時(shí)間周期內(nèi)重復(fù),那么可以對目標(biāo)表進(jìn)行按照時(shí)間分區(qū),這樣就可以在merge操作的時(shí)候指定時(shí)間范圍。
sql
MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYSWHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS THEN INSERT *
scala
deltaTable.as("logs").merge( newDedupedLogs.as("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") .insertAll() .execute()
這種利用分區(qū)進(jìn)行謂詞下推,可以大幅減少數(shù)據(jù)加載的量,進(jìn)而提升速度。此外,對于Structured Streaming可以使用insert-only merge操作來實(shí)現(xiàn)連續(xù)不斷的去重操作。主要有以下場景:
a.對于一些streaming操作,可以在foreachBatch操作來實(shí)現(xiàn)連續(xù)不斷的將數(shù)據(jù)寫入delta lake表,同時(shí)具有去重的功能。
b.對于另一些流查詢,你可以連續(xù)不斷的從delta lake表中讀取去重的數(shù)據(jù)??梢赃@么做的原因是insert-only merge操作僅僅會追加新的數(shù)據(jù)到delta lake表中。
2.漸變緯度數(shù)據(jù)
另一個(gè)常見的操作是SCD Type 2,它維護(hù)對維表中每個(gè)key所做的所有變更的歷史記錄。此類操作需要更新現(xiàn)有行以將key的先前值標(biāo)記為舊值,并插入新行作為最新值。給定具有更新的源表和具有維度數(shù)據(jù)的目標(biāo)表,可以使用merge表達(dá)SCD type 2。
維護(hù)客戶地址歷史記錄以及每個(gè)地址的有效日期范圍,是本小節(jié)常見的示例操作。當(dāng)需要更新客戶的地址時(shí),必須將先前的地址標(biāo)記為不是當(dāng)前地址,更新其有效日期范圍,然后將新地址添加為當(dāng)前地址。scala的表達(dá)方法如下:
val customersTable: DeltaTable = ... // table with schema (customerId, address, current, effectiveDate, endDate)
val updatesDF: DataFrame = ... // DataFrame with schema (customerId, address, effectiveDate)
// Rows to INSERT new addresses of existing customers
val newAddressesToInsert = updatesDF
.as("updates")
.join(customersTable.toDF.as("customers"), "customerid")
.where("customers.current = true AND updates.address <> customers.address")
// Stage the update by unioning two sets of rows
// 1. Rows that will be inserted in the whenNotMatched clause
// 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customers
val stagedUpdates = newAddressesToInsert
.selectExpr("NULL as mergeKey", "updates.*") // Rows for 1.
.union(
updatesDF.selectExpr("updates.customerId as mergeKey", "*") // Rows for 2.
)
// Apply SCD Type 2 operation using merge
customersTable
.as("customers")
.merge(
stagedUpdates.as("staged_updates"),
"customers.customerId = mergeKey")
.whenMatched("customers.current = true AND customers.address <> staged_updates.address")
.updateExpr(Map( // Set current to false and endDate to source's effective date.
"current" -> "false",
"endDate" -> "staged_updates.effectiveDate"))
.whenNotMatched()
.insertExpr(Map(
"customerid" -> "staged_updates.customerId",
"address" -> "staged_updates.address",
"current" -> "true",
"effectiveDate" -> "staged_updates.effectiveDate", // Set current to true along with the new address and its effective date.
"endDate" -> "null"))
.execute()
3.cdc操作
和scd類似,另一個(gè)常見的案例是變化數(shù)據(jù)捕獲,也即是常說的CDC,簡單來說就是同步外部數(shù)據(jù)庫的變更數(shù)據(jù)到deta lake。換句話說,對于外部數(shù)據(jù)庫的 update,delete,insert操作,要同時(shí)作用于delta 表。這種情況,也可以使用merge操作來實(shí)現(xiàn)。
val deltaTable: DeltaTable = ... // DeltaTable with schema (key, value)// DataFrame with changes having following columns// - key: key of the change// - time: time of change for ordering between changes (can replaced by other ordering id)// - newValue: updated or inserted value if key was not deleted// - deleted: true if the key was deleted, false if the key was inserted or updatedval changesDF: DataFrame = ...// Find the latest change for each key based on the timestamp// Note: For nested structs, max on struct is computed as// max on first struct field, if equal fall back to second fields, and so on.val latestChangeForEachKey = changesDF .selectExpr("key", "struct(time, newValue, deleted) as otherCols" ) .groupBy("key") .agg(max("otherCols").as("latest")) .selectExpr("key", "latest.*")deltaTable.as("t") .merge( latestChangeForEachKey.as("s"), "s.key = t.key") .whenMatched("s.deleted = true") .delete() .whenMatched() .updateExpr(Map("key" -> "s.key", "value" -> "s.newValue")) .whenNotMatched("s.deleted = false") .insertExpr(Map("key" -> "s.key", "value" -> "s.newValue")) .execute()
4. 整合foreachBatch
實(shí)際上在使用delta lake的時(shí)候可以結(jié)合foreachBatch和merge,來實(shí)現(xiàn)復(fù)雜的流查詢到delta lake表的upsert功能。總共有以下幾個(gè)場景:
a.在update模式下寫流聚合結(jié)果到delta lake。這種情況,實(shí)際上比Complete模式更加高效。
import io.delta.tables.*
val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
b.將數(shù)據(jù)庫變更操作同步到delta lake。該場景就是寫變化數(shù)據(jù)到delta lake,也即是本問第三小節(jié)。
c.流數(shù)據(jù)以去重的方式寫入delta lake。這個(gè)就是本文第一小節(jié)。
注意:
確保foreachBatch中的merge語句是冪等的,因?yàn)橹匦聠恿鞑樵兛梢詫υ摬僮鲗ν慌鷶?shù)據(jù)重復(fù)執(zhí)行。
當(dāng)在foreachBatch中使用merge時(shí),流查詢的輸入數(shù)據(jù)速率可能會上報(bào)為在源處生成數(shù)據(jù)的實(shí)際速率的若干倍數(shù)。這是因?yàn)閙erge多次讀取輸入數(shù)據(jù),導(dǎo)致輸入指標(biāo)倍增。如果這是瓶頸,則可以在合并之前緩存批處理DataFrame,然后在合并之后取消緩存。
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。
文章名稱:deltalake的merge場景是怎么樣的
新聞來源:http://muchs.cn/article0/pipgoo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計(jì)、標(biāo)簽優(yōu)化、建站公司、域名注冊、品牌網(wǎng)站建設(shè)、移動網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)