怎樣分析DebeziumMySQL模塊設(shè)計(jì)

今天就跟大家聊聊有關(guān)怎樣分析Debezium MySQL模塊設(shè)計(jì),可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

成都創(chuàng)新互聯(lián)公司是專業(yè)的潞州網(wǎng)站建設(shè)公司,潞州接單;提供成都做網(wǎng)站、網(wǎng)站建設(shè),網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行潞州網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!

注:本文不會(huì)著重分析MySQL binlog格式結(jié)構(gòu)和解析過程,而在于debezium的架構(gòu)設(shè)計(jì)。

Debezium is an open source distributed platform for change data capture.

這句話引用自debezium官網(wǎng),可以看到,debezium的野心還是很大的,把自己定義為一個(gè)通用的CDC平臺,事實(shí)上,也確實(shí)如此,尤其是從0.8版本以來,開發(fā)者將大量精力投入到PostgreSQL模塊的開發(fā),一方面引入SQL Server, Oracle, Db2, Cassandra等數(shù)據(jù)庫的支持,另一方面適配了Pulsar,Amazon Kineis,Google Pub/Sub等消息引擎,并且逐步重構(gòu),解耦和具體數(shù)據(jù)庫的綁定以及具體消息系統(tǒng)的依賴,向統(tǒng)一架構(gòu)和云原生靠攏。

事實(shí)上,早期的Debezium是和Kafka Connect框架緊耦合的,Debezium是Kafka Connect的一個(gè)Source Plugin,并且主要適配MySQL,稍帶了MongoDB。

目前Debezium最新版是1.2,1.3已經(jīng)進(jìn)入beta階段。其實(shí)MySQL模塊在0.8版本已經(jīng)基本定下來了,后面的變動(dòng)只有0.9合入了DBZ-175,這是一個(gè)在線加表特性,僅作為內(nèi)部實(shí)驗(yàn)特性,并沒有在文檔上提及。我個(gè)人認(rèn)為這個(gè)設(shè)計(jì)十分精彩,會(huì)在本文有篇幅專門討論。在后續(xù)的開發(fā)中,同為早期的MongoDB被徹底重構(gòu),但MySQL模塊還是保持原來的樣子。

Rebase MySQL connector to common framework used by the other connectors.

這個(gè)已經(jīng)在Roadmap上掛了有段兒時(shí)間了,但可以預(yù)見到,短期內(nèi)還不會(huì)有什么動(dòng)作。很大一部分原因是,MySQL模塊的代碼中有大量的針對MySQL和Kafka Connect缺陷的額外處理,還有像DBZ-175這種統(tǒng)一架構(gòu)還不支持的特性,另外由于MySQL的廣泛使用,多年來社區(qū)發(fā)現(xiàn)和修復(fù)了大量的場景下的bug,把一個(gè)久經(jīng)驗(yàn)證的模塊架構(gòu)推倒是一件風(fēng)險(xiǎn)很大的事情。

后續(xù)的分析僅僅針對MySQL模塊的架構(gòu)和代碼,基本上不會(huì)涉及新的統(tǒng)一架構(gòu)。

Kafka Connect

上文提到,Debezium最初設(shè)計(jì)成一個(gè)Kafka Connect 的Source Plugin,目前開發(fā)者雖致力于將其與Kafka Connect解耦,但當(dāng)前的代碼實(shí)現(xiàn)還未變動(dòng)。下圖引自Debeizum官方文檔,可以看到一個(gè)Debezium在一個(gè)完整CDC系統(tǒng)中的位置。

怎樣分析Debezium MySQL模塊設(shè)計(jì)

Kafka Connect 為Source Plugin提供了一系列的編程接口,最主要的就是要實(shí)現(xiàn)SourceTask的poll方法,其返回List<SourceRecord>將會(huì)被以最少一次語義的方式投遞至Kafka。如果你想了解更多Kafka Connect的細(xì)節(jié),請參閱我的另一篇文章:https://www.jianshu.com/p/538b2f0a7462

public abstract class SourceTask implements Task {
    ...
    public abstract List<SourceRecord> poll() throws InterruptedException;
    ...
}

Debezium MySQL 架構(gòu)

Reader體系構(gòu)成了MySQL模塊中代碼的主線,我們的分析從Reader開始。

怎樣分析Debezium MySQL模塊設(shè)計(jì)

這里是Reader的整個(gè)繼承樹,我們先暫時(shí)忽略ParallelSnapshotReader,ReconcilingBinlogReader,他們是DBZ-175引入的東西。

從名字上應(yīng)該可以看出,真正主要的是SnapshotReader和BinlogReader,分別實(shí)現(xiàn)了對MySQL數(shù)據(jù)的全量讀取和增量讀取,他們繼承于AbstractReader,里面封裝了共用邏輯,下圖是AbstractReader的內(nèi)部設(shè)計(jì)。

怎樣分析Debezium MySQL模塊設(shè)計(jì)

可以看到,AbstractReader在實(shí)現(xiàn)時(shí),并沒有直接將enqueue喂進(jìn)來的record投遞進(jìn)Kafka,而是通過一個(gè)內(nèi)存阻塞隊(duì)列BlockingQueue進(jìn)行了解耦,這種設(shè)計(jì)有諸多好處:

  1. 職責(zé)解耦

如上的圖中,在喂入BlockingQueue之前,要根據(jù)條件判斷是否接受該record;在向Kafka投遞record之前,判斷task的running狀態(tài)。這樣把同類的功能限定在特定的位置。

  1. 線程隔離

BlockingQueue是一個(gè)線程安全的阻塞隊(duì)列,通過BlockingQueue實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者模型,是可以跑在不同的線程里的,這樣避免局部的阻塞帶來的整體的干擾。如上圖中的右側(cè),消費(fèi)者會(huì)定期判斷running標(biāo)志位,若running被stop信號置為了false,可以立刻停止整個(gè)task,而不會(huì)因MySQL IO阻塞延遲相應(yīng)。

  1. Single與Batch的互相轉(zhuǎn)化

Enqueue record是單條的投遞record,drain_to是批量的消費(fèi)records。這個(gè)用法也可以反過來,實(shí)現(xiàn)batch到single的轉(zhuǎn)化。

還剩下兩個(gè)ChainedReader和TimedBlockingReader。

  • ChainedReader顧名思義,會(huì)把幾個(gè)Reader包裝起來,串行執(zhí)行。

  • TimedBlockingReader就是簡單的sleep一段時(shí)間,它的存在是為了應(yīng)對Kafka Connect rebalance的設(shè)計(jì)缺陷,在上文中我的另一篇文章中有提到。

Snapshot Stream 無縫銜接

如果你搭建過MySQL的主從同步,因該知道,建立從庫時(shí),需要先導(dǎo)出全量數(shù)據(jù)(MySQL 8.0.x好像已經(jīng)有了更便捷的方法),然后記錄binlog的位置,把全量數(shù)據(jù)導(dǎo)入從庫后,從binlog位置繼續(xù)增量同步,已保持?jǐn)?shù)據(jù)的一致性。

可能你還知道阿里開源的另一個(gè)MySQL CDC工具canal,他只負(fù)責(zé)stream過程,并沒有處理snapshot過程,這也是debezium相較于canal的一個(gè)優(yōu)勢。

對于Debezium來說,基本沿用了官方搭建從庫的這一思路,讓我們看下官方文檔描述的詳細(xì)步驟。(如果沒有額外說明,后面的討論僅針對Innodb引擎)

  1. Grabs a global read lock that blocks writes by other database clients. The snapshot itself does not prevent other clients from applying DDL which might interfere with the connector’s attempt to read the binlog position and table schemas. The global read lock is kept while the binlog position is read before released in a later step.

  2. Starts a transaction with repeatable read semantics to ensure that all subsequent reads within the transaction are done against the consistent snapshot.

  3. Reads the current binlog position.

  4. Reads the schema of the databases and tables allowed by the connector’s configuration.

  5. Releases the global read lock. This now allows other database clients to write to the database.

  6. Writes the DDL changes to the schema change topic, including all necessary DROP… and CREATE… DDL statements. This happens if applicable.

  7. Scans the database tables and generates CREATE events on the relevant table-specific Kafka topics for each row.

  8. Commits the transaction.

  9. Records the completed snapshot in the connector offsets.

Debezium把這個(gè)過程分解成了9步,看上去好像比我們想的要復(fù)雜些。

在Debezium目前版本的實(shí)現(xiàn)中,這9步是單線程串行的,其中主要的耗時(shí)就在第7步,這一步其實(shí)就是使用最樸素的方式,通過jdbc使用select * from table [where ...]來實(shí)現(xiàn)的讀取全量數(shù)據(jù),如果很多千萬級甚至更大的表,這一步的耗時(shí)是很長的。其實(shí)這一步是可以并行化的,在第1步中,已經(jīng)獲取了全局鎖,在全局鎖釋放前,是可以開多個(gè)連接,并行的實(shí)現(xiàn)全量數(shù)據(jù)的拉去,極大的提升效率。

另外snapshot整個(gè)過程如果失敗,是無法恢復(fù)的,畢竟事務(wù)已經(jīng)丟了,無法再讀取當(dāng)時(shí)的快照,來保證數(shù)據(jù)的一致性。

Snapshot過程時(shí)間長和中斷不可恢復(fù),再加上Kafka Connect 粗暴的rebalance策略,正是早期使用debezium的一大痛點(diǎn)。TimedBlockingReader的引入正是為了在一定程度上緩解這個(gè)問題。

ChainedReader
├── TimedBlockReader
├── SnapshotReader
└── BinlogReader

當(dāng)準(zhǔn)備一次性提交多個(gè)同步任務(wù)時(shí),因?yàn)槊看稳蝿?wù)提交都會(huì)觸發(fā)一次rebalance,在SnapshotReader和BinlogReader前插入一個(gè)TimedBlockReader,確保同步任務(wù)提交后不會(huì)立刻執(zhí)行,等多個(gè)任務(wù)都提交完成時(shí),集群穩(wěn)定下來,才會(huì)開始并發(fā)執(zhí)行。

特別的,snapshot和stream過程都是可選的,你也可以像canal一樣只從當(dāng)前時(shí)刻開始監(jiān)聽binlog,捕獲stream數(shù)據(jù),具體配置請參考官方文檔。

Schema時(shí)間線構(gòu)造

下面我們關(guān)注一下stream過程,也就是binlog解析過程。(做數(shù)據(jù)同步binlog必須設(shè)為row模式)

相信能讀到這里的大多數(shù)同學(xué)都執(zhí)行過以下命令,就是用MySQL官方的binlog工具解析binlog文件內(nèi)容。仔細(xì)看,你會(huì)發(fā)現(xiàn),這里面有庫名和表名,有每個(gè)字段的值,卻沒有字段名,換句話說,binlog里不包含schema信息!

mysqlbinlog --no-defaults --base64-output=decode-rows -vvv ~/Downloads/mysql-bin.001192 | less
#190810 12:00:20 server id 206195699  end_log_pos 8624 CRC32 0x46912d80         GTID    last_committed=12       sequence_number=13      rbr_only=yes
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
SET @@SESSION.GTID_NEXT= '5358e6dc-d161-11e8-8a6c-7cd30ac4dc44:25115781'/*!*/;
# at 8624
#190810 12:00:20 server id 206195699  end_log_pos 8687 CRC32 0xe14a2f5a         Query   thread_id=576127        exec_time=0     error_code=0
SET TIMESTAMP=1565409620/*!*/;
BEGIN
/*!*/;
# at 8687
#190810 12:00:20 server id 206195699  end_log_pos 8775 CRC32 0xaf16fb7d         Table_map: `risk_control`.`log_operation` mapped to number 39286
# at 8775
#190810 12:00:20 server id 206195699  end_log_pos 9055 CRC32 0x9bdc15ae         Write_rows: table id 39286 flags: STMT_END_F
### INSERT INTO `risk_control`.`log_operation`
### SET
###   @1=8166048 /* INT meta=0 nullable=0 is_null=0 */
###   @2='7b0d526124ba40f6ac71cfe1d0d90665' /* VARSTRING(160) meta=160 nullable=0 is_null=0 */
###   @3=17 /* INT meta=0 nullable=1 is_null=0 */
###   @4='2' /* VARSTRING(64) meta=64 nullable=1 is_null=0 */
###   @5='casign_end=;方法public void com.xxx.risk.service.impl.ActivitiEventServiceImpl.updateAuditStatus(java.lang.String,java.lang.String);參數(shù){"auditStatus": 3}' /* VARSTRING(1020) meta=1020 nullable=0 is_null=0 */
###   @6='\x00\x01\x00\x16\x00\x0b\x00\x0b\x00\x05\x03\x00auditStatus' /* JSON meta=4 nullable=1 is_null=0 */
###   @7='2019-08-10 12:00:21' /* DATETIME(0) meta=0 nullable=0 is_null=0 */
###   @8='' /* VARSTRING(128) meta=128 nullable=0 is_null=0 */
###   @9='' /* VARSTRING(256) meta=256 nullable=0 is_null=0 */
# at 9055
#190810 12:00:20 server id 206195699  end_log_pos 9086 CRC32 0xbe19a1b1         Xid = 180175929
COMMIT/*!*/;

其實(shí)這種設(shè)計(jì)可以理解,作為一個(gè)高效的二進(jìn)制格式,binlog里不存儲(chǔ)冗余度極高的列名可以很可觀的減少體積,并且,有了表名,表結(jié)構(gòu)信息可以從MySQL information_schema表中拿到的,何必再存一份呢?

但是,debezium偷梁換柱,模擬從庫拉取binlog做解析,他并不是真正的從庫,是沒有information_schema表可以查的,只能從MySQL主庫查詢。但這個(gè)方式真的萬無一失嗎?

考慮下面的場景:

  • 15:00 BinlogReader正常消費(fèi)

  • 15:05 Kafka Connect集群維護(hù),暫停BinlogReader

  • 15:10 表A修改,在第3列后增加了1列(新增列不一定在尾部)

  • 15:15 Kafka Connect集群維護(hù)結(jié)束,恢復(fù)BinlogReader

這個(gè)場景中,BinlogReader在15:15恢復(fù)后,會(huì)繼續(xù)從15:05讀取并解析binlog,如果這時(shí)從MySQL讀取information_schema來獲取表A的schema信息,那么在15:05-15:10期間binlog和schema是不匹配的,也就無法解析出正確的數(shù)據(jù)。換句話說,如果debezium讀取binlog有延遲,這段時(shí)間主庫schema做了修改,那么讀取主庫information_schema的方案就會(huì)有問題了。

要解決這個(gè)問題,就要模擬information_schema機(jī)制,維護(hù)一份當(dāng)前的schema快照,可這樣就夠了嗎?

回到前面提到的AbstractReader內(nèi)部設(shè)計(jì)上,BinlogReader作為生產(chǎn)者,其將解析后的數(shù)據(jù)投遞到BlockingQueue中,如果在解析binlog過程中遇到了DDL語句(比如alter table add column ...),就會(huì)更新當(dāng)前的schema快照。

怎樣分析Debezium MySQL模塊設(shè)計(jì)

這時(shí),如果stop task,如上圖,BlockingQueue中還未被消費(fèi)的records將被丟棄,如果包含schema修改之前解析出的record,那么下次binlog將從此處開始解析,而debezium存儲(chǔ)的schema快照卻已經(jīng)對應(yīng)了修改后的,也會(huì)照成binlog和schema的不匹配。在這種邊緣場景下,僅保存當(dāng)前schema快照的方案就行不通了。當(dāng)然,后面我們還會(huì)提到這種模式同樣不能滿足很多其他場景。

事已至此,我們只好使用終極方案,把每一次的schema變更都保存下來,構(gòu)造一條完整的schema時(shí)間線,確保在解析任一時(shí)刻的binlog事件時(shí),都能找到對應(yīng)版本的schema快照。

Debezium中使用DatabaseHistory來實(shí)現(xiàn)該功能,功能已經(jīng)滿足,不過實(shí)現(xiàn)的確是簡陋。MySQLDatabaseHistory會(huì)從同步任務(wù)啟動(dòng)時(shí),導(dǎo)出所有的create table語句(參見snapshot過程第6步),在此基礎(chǔ)上,追加記錄每一條DDL語句,debezium為這些DDL存儲(chǔ)提供了內(nèi)存、文件、kafka topic實(shí)現(xiàn),其中kafka topic必須設(shè)置過期策略為永不過期。

當(dāng)要恢復(fù)到任意時(shí)刻的schema快照時(shí),從頭開始,逐條解析所有的DDL,疊加修改,直到指定時(shí)刻前最后一條DDL??梢钥吹?,這種實(shí)現(xiàn)方式的效率是比較低的,當(dāng)任務(wù)持續(xù)數(shù)個(gè)月時(shí),會(huì)累積大量的DDL(尤其是在阿里云RDS上,不知道阿里云改了什么,binlog里會(huì)產(chǎn)生海量的DDL),一次恢復(fù)可能需要數(shù)十分鐘乃至數(shù)個(gè)小時(shí),并且若其中有一處DDL解析錯(cuò)誤,會(huì)導(dǎo)致其后所有的快照都發(fā)生錯(cuò)誤。開發(fā)者很早就意識到了這個(gè)問題,并且提出了一些改進(jìn)想法,可能會(huì)在不久后有所進(jìn)展。

看到這里,相信你已經(jīng)可以理解,為什么有些商業(yè)的數(shù)據(jù)同步引擎對同步過程中的schema變更有所限制,要完備的支持各種情況,著實(shí)不是一件容易的事情。

飛機(jī)上換引擎——在線加表

我們前面已經(jīng)多次提到了DBZ-175,現(xiàn)在我們我們開始討論這個(gè)精彩的設(shè)計(jì)。https://issues.redhat.com/browse/DBZ-175

注:該功能僅作為內(nèi)部實(shí)驗(yàn)特性,官方文檔未提及,有問題請參考JIRA討論或者閱讀調(diào)試源碼。

我們先來補(bǔ)充一些背景,debezium在同步數(shù)據(jù)過程中,允許通過table.whitelist和table.blacklist指定要同步的表。假設(shè)一開始,我們將table.whitelist配置為a,b兩張表,這兩張表完成了Snapshot階段,已經(jīng)穩(wěn)定的切換到Stream階段。這時(shí),來了新的同步需求,要再同步c表,并且最好不干擾a,b兩張表的同步進(jìn)度。那很自然的想法就是我再起一個(gè)新的同步任務(wù)來處理c表,長此以往,你會(huì)發(fā)現(xiàn)一個(gè)MySQL主庫上掛了很多個(gè)“從庫”,對MySQL主庫會(huì)照成一定壓力。所以,一個(gè)理想的方案便是:修改同步任務(wù)的table.whitelist后,debezium可以自動(dòng)完成新增表的全量和增量同步,并且這個(gè)過程不會(huì)干擾原有的同步任務(wù);當(dāng)新老兩批同步任務(wù)進(jìn)度相近時(shí),合二為一,只使用同一個(gè)BinlogReader完成后續(xù)的stream同步。

簡單吧!怎么實(shí)現(xiàn)呢?就是我們前文暫時(shí)略過的ParallelSnapshotReader和ReconcilingBinlogReader。

首先描述一下在線加表后,整個(gè)Reader的結(jié)構(gòu)。

ChainedReader
├── ParallelSnapshotReader
│   ├── OldTablesBinlogReader
│   └── ChainedReader
│       ├── NewTablesSnapshotReader
│       └── NewTablesBinlogReader
├── ReconcilingBinlogReader
└── UnifiedBinlogReader
  1. ParallelSnapshotReader就如其名字一樣,在保證不干擾OldTablesBinlogReader運(yùn)行的情況下,并行的開始對新增表進(jìn)行全量和增量同步;

  2. 當(dāng)新增表進(jìn)入stream階段后,OldTablesBinlogReader和NewTablesBinlogReader每一次拉取都會(huì)和對方作比較,當(dāng)兩者的進(jìn)度相差在一定時(shí)間內(nèi)(默認(rèn)時(shí)5分鐘)時(shí),將兩者停止;

  3. 此時(shí)ParallelSnapshotReader退出,由ReconcilingBinlogReader將兩個(gè)BinlogReader進(jìn)度同步,即將滯后者追平只領(lǐng)先者;

  4. 原有的兩個(gè)BinlogReader退出,新建的UnifiedBinlogReader從其位點(diǎn)繼續(xù)做新老所有表的stream解析,整個(gè)合并過程結(jié)束。

這段設(shè)計(jì)是我在翻閱了JIRA上數(shù)位開發(fā)者們歷經(jīng)幾年的討論記錄后,結(jié)合代碼調(diào)試整理得出的?;叵氘?dāng)年剛畢業(yè)的我,接到這個(gè)在線加表需求時(shí),本以為是不可能實(shí)現(xiàn)的事情,直到發(fā)現(xiàn)了這個(gè)設(shè)計(jì),不由得是感嘆和驚喜。

這只是一個(gè)初版的實(shí)現(xiàn),在整個(gè)過程中,因?yàn)樵獢?shù)據(jù)的設(shè)計(jì)問題,并不支持schema的變更,可能正是由于這個(gè)原因,嚴(yán)謹(jǐn)?shù)拈_發(fā)者們選擇不公開這項(xiàng)功能,僅作為內(nèi)部實(shí)驗(yàn)特性。

從架構(gòu)的層面梳理了debezium MySQL模塊的基本結(jié)構(gòu),希望能讓大家對代碼的整體的結(jié)構(gòu)和設(shè)計(jì)理念有所理解,內(nèi)部還有諸多細(xì)節(jié)等待探索。后面若有機(jī)會(huì),我會(huì)分享一些在生產(chǎn)環(huán)境中遇到的問題和處理方案。

看完上述內(nèi)容,你們對怎樣分析Debezium MySQL模塊設(shè)計(jì)有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。

文章名稱:怎樣分析DebeziumMySQL模塊設(shè)計(jì)
鏈接地址:http://muchs.cn/article44/jioiee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供自適應(yīng)網(wǎng)站網(wǎng)站改版、響應(yīng)式網(wǎng)站網(wǎng)站建設(shè)、網(wǎng)站制作面包屑導(dǎo)航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)