六、flink--容錯(cuò)機(jī)制

一、flink容錯(cuò)機(jī)制

1.1 flink 的容錯(cuò)概述

在使用了flink的狀態(tài)管理之后,因?yàn)榇藭r(shí)所有的state的讀寫都只是在task本地的內(nèi)存中進(jìn)行,也就是state數(shù)據(jù)此時(shí)只存儲在內(nèi)存中。假設(shè)當(dāng)任務(wù)出現(xiàn)故障之后,這些在內(nèi)存中的state數(shù)據(jù)也會丟失,就無法恢復(fù)了。所以需要一種機(jī)制來保障這些state數(shù)據(jù)的不丟失,這也就是容錯(cuò)機(jī)制。flink通過checkpoint來實(shí)現(xiàn)。flink開啟了checkpoint之后,會定時(shí)將狀態(tài)數(shù)據(jù)的快照持久存儲到指定的statebackend。

我們提供的服務(wù)有:成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè)、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、攀枝花ssl等。為上千企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的攀枝花網(wǎng)站制作公司

1.2 checkpoint基本原理

? flink定期對整個(gè)job任務(wù)進(jìn)行快照,將快照產(chǎn)生的備份數(shù)據(jù)保存到指定的statebacked中。當(dāng)出現(xiàn)故障時(shí),將job 的狀態(tài)恢復(fù)到最近的一個(gè)快照點(diǎn)。Flink 的容錯(cuò)機(jī)制的核心部分是生成分布式數(shù)據(jù)流和operator狀態(tài)一致的快照。這些快照充當(dāng)checkpoint(檢查點(diǎn)), 系統(tǒng)可以早發(fā)生故障時(shí)將其回滾。分布式快照是由 Chandy-Lamport 算法實(shí)現(xiàn)的。
? 每個(gè)checkpoint由checkpoint ID和timestamp來唯一標(biāo)識,其中checkpoint ID可以是standalone(基于內(nèi)存,保存在jobmanager內(nèi)存中)的,也可能是基于ZK的。

1.3 使用checkpoint的先決條件

1、持續(xù)的數(shù)據(jù)源,比如消息隊(duì)列或者文件系統(tǒng)上的文件等
2、狀態(tài)數(shù)據(jù)的持久化存儲,比如采用分布式文件系統(tǒng)存儲狀態(tài)數(shù)據(jù)

1.4 程序中啟用checkpoint的配置

默認(rèn)情況下,flink是禁用了checkpoint的。下面看看程序中開啟checkpoint以及相關(guān)checkpoint工作參數(shù)的配置。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

/* -------------啟用checkpoint   */
//只指定兩個(gè)checkpoint的時(shí)間間隔,單位是毫秒
env.enableCheckpointing(1000);  
//指定checkpoint時(shí)間間隔,并指定checkpoint的模式,是exactly-once(剛好一次)還是AT_LEAST_ONCE(至少一次)。大多數(shù)情況下是exactly-once(默認(rèn)就是這個(gè)模式),少數(shù)情況下,如果要求超低延遲的處理情況,才會設(shè)置AT_LEAST_ONCE
env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE) 

/* -------------設(shè)置checkpoint 模式,和上面的類似  */
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

/* -------------設(shè)置checkpoint上一個(gè)的結(jié)束點(diǎn)到下一個(gè)開始點(diǎn)之間的最短時(shí)間。
因?yàn)閏heckpoint觸發(fā)時(shí),需要一定時(shí)間去完成整個(gè)checkpoint的過程,
如果checkpoint的完成時(shí)間過程,導(dǎo)致前后兩個(gè)checkpoint間的時(shí)間間隔過短,這是不合適的,沒有必要。
1、這里的時(shí)間間隔,指的是上一個(gè)checkpoint完成的時(shí)間點(diǎn),到下一個(gè)checkpoint開始的時(shí)間點(diǎn)的間隔,如果過短,會導(dǎo)致頻繁checkpoint,影響性能。假設(shè)這個(gè)間隔為T
2、而上面設(shè)置的checkpoint時(shí)間間隔,指的是前一個(gè)checkpoint的開始時(shí)間到下一個(gè)checkpoint的開始時(shí)間。所以是始終大于1中的時(shí)間間隔的。假設(shè)這個(gè)間隔為 N

如果T小于這里設(shè)置的值,那么無論N設(shè)置多少,下一個(gè)checkpoint的開始時(shí)間必須是500ms之后。如果T大于這里設(shè)置的值,那么正常按照N設(shè)置的間隔來觸發(fā)下一個(gè)checkpoint,這里設(shè)置的間隔無關(guān)了。
*/
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

/* -------------設(shè)置checkpoint完成的超時(shí)時(shí)間   */
env.getCheckpointConfig().setCheckpointTimeout(60000);

/* -------------設(shè)置checkpoint的最大并行度   */
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

/*  開啟checkpoints的外部持久化,但是在job失敗的時(shí)候不會自動(dòng)清理,需要自己手工清理state
DELETE_ON_CANCELLATION:在job canceled的時(shí)候會自動(dòng)刪除外部的狀態(tài)數(shù)據(jù),但是如果是FAILED的狀態(tài)則會保留;
RETAIN_ON_CANCELLATION:在job canceled的時(shí)候會保留狀態(tài)數(shù)據(jù)*/
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

/*  當(dāng)有更近的保存點(diǎn)時(shí),優(yōu)先采用savepoint來恢復(fù)成檢查點(diǎn)*/
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

1.5 checkpoint在flink配置文件中的配置項(xiàng)

在conf/flink-conf.yaml中可以配置的參數(shù)

參數(shù)默認(rèn)值作用
state.backend (無) 用于存儲狀態(tài)的檢查點(diǎn)數(shù)據(jù)的后端。有三種backend,如:jobmanager(MemoryStateBackend,默認(rèn)是這個(gè)),filesystem(FsStateBackend),rocksdb(RocksDBStateBackend)。
state.backend.async true 是否異步快照
state.checkpoints.dir checkpoint的目錄,例如:hdfs://192.168.1.1/checkpoint
state.backend.incremental false 是否選擇增量檢查點(diǎn)。即每次快照都只存儲上一個(gè)檢查點(diǎn)的差異數(shù)據(jù),而不是完整的數(shù)據(jù)。可能某些后端不支持這種方式
state.checkpoints.num-retained 1 要保留的已完成檢查點(diǎn)的最大數(shù)量。
state.savepoints.dir 保存點(diǎn)默認(rèn)目錄

1.6 checkpoint和savepoint的區(qū)別

1.6.1 區(qū)別

checkpoint:
1、檢查點(diǎn)的主要目的是在job意外失敗時(shí)提供恢復(fù)機(jī)制。
2、Checkpoint的生命周期由Flink管理,即Flink創(chuàng)建,擁有和發(fā)布Checkpoint - 無需用戶交互。
3、作為一種恢復(fù)和定期觸發(fā)的方法,Checkpoint主要的設(shè)計(jì)目標(biāo)是:創(chuàng)建checkpoint,是輕量級的和盡可能快地恢復(fù)

savepoint:
1、Savepoints由用戶創(chuàng)建,擁有和刪除。
2、他們一般是有計(jì)劃的進(jìn)行手動(dòng)備份和恢復(fù)。而checkpoint的恢復(fù)只會發(fā)生在故障時(shí)
3、例如,在Flink版本需要更新的時(shí)候,或者更改你的流處理邏輯,更改并行性等等。
在這種情況下,我們往往需要關(guān)閉一下流,這就需要我們將流中的狀態(tài)進(jìn)行存儲,后面重新部署job的時(shí)候進(jìn)行會用來恢復(fù)。
4、從概念上講,Savepoints的生成和恢復(fù)成本可能更高,并且更多地關(guān)注可移植性和對前面提到的作業(yè)更改的支持

1.6.2 使用savepoint

命令用法:

flink savepoint jobid target_dir

例子:

保存狀態(tài)數(shù)據(jù)到指定目錄:
flink savepoint xxxxxxxx(哈希碼) hdfs://ronnie01:8020/data/flink/savepoint

重啟和恢復(fù)數(shù)據(jù)流(也可用于從checkpoint恢復(fù)數(shù)據(jù)流):
flink run -s hdfs://ronnie01:8020/data/flink/savepoint/savepoint-xxxxx-xxxxxxxxx -c com.ronnie.flink.stream.test.CheckPointTest flink-test.jar

-s 指定savepoint/checkpoint目錄的存儲目錄
-c 指定運(yùn)行的主類的全類名

二、stateBackend分類

checkpoint數(shù)據(jù)可存儲方式有不同,flink支持三種:
MemoryStateBackend(內(nèi)存狀態(tài))
FsStateBackend(文件狀態(tài))
RocksDBStateBackend(RocksDB狀態(tài))

2.1 MemoryStateBackend

1、概念
MemoryStateBackend將State作為Java對象保存(在堆內(nèi)存),存儲著key/value狀態(tài)、window運(yùn)算符、觸發(fā)器等的哈希表。在Checkpoint時(shí),State Backend將對State進(jìn)行快照,并將其作為checkpoint發(fā)送到JobManager機(jī)器上,JobManager將這個(gè)State數(shù)據(jù)存儲在Java堆內(nèi)存。MemoryStateBackend默認(rèn)使用異步快照,來避免阻塞管道。如果需要修改,可以在MemoryStateBackend的構(gòu)造函數(shù)將布爾值改為false(僅用于調(diào)試)。

2、注意點(diǎn)
異步快照方式時(shí),operator操作符在做快照的同時(shí)也會處理新流入的數(shù)據(jù),默認(rèn)異步方式
同步快照方式:operator操作符在做快照的時(shí)候,不會處理新流入的數(shù)據(jù),同步快照會增加數(shù)據(jù)處理的延遲度。

3、局限性
單次狀態(tài)大小最大默認(rèn)被限制為5MB,這個(gè)值可以通過構(gòu)造函數(shù)來更改。
無論單次狀態(tài)大小最大被限制為多少,都不可用大過akka的frame大小。
聚合的狀態(tài)都會寫入jobmanager的內(nèi)存

4、適用場景
本地開發(fā)和調(diào)試
狀態(tài)比較少的作業(yè)

2.2 FsStateBackend(生產(chǎn)環(huán)境最常用)

1、概念
FsStateBackend將正在運(yùn)行的數(shù)據(jù)保存在TaskManager的內(nèi)存中。在checkpoint時(shí),它將State的快照寫入文件系統(tǒng)對應(yīng)的目錄下的文件中。最小元數(shù)據(jù)存儲在JobManager的內(nèi)存中(如果是高可用模式下,元數(shù)據(jù)存儲在checkpoint中)。FsStateBackend默認(rèn)使用異步快照,來避免阻塞處理的管道。如果需要禁用,在FsStateBackend構(gòu)造方法中將布爾值設(shè)為false

2、適用場景
狀態(tài)比較大, 窗口比較長, 大的 KV 狀態(tài)
需要做 HA 的場景

2.3 RocksDBStateBackend

1、概念
此種方式kv state需要由rockdb數(shù)據(jù)庫來管理,這是和內(nèi)存或file backend最大的不同,即狀態(tài)數(shù)據(jù)是直接寫入到rockdb的,不像前面兩種,只有在checkpoint的時(shí)候才會將數(shù)據(jù)保存到指定的backend。RocksDBStateBackend使用RocksDB數(shù)據(jù)庫保存數(shù)據(jù),這個(gè)數(shù)據(jù)庫保存在TaskManager的數(shù)據(jù)目錄中。注意的是:RocksDB,它是一個(gè)高性能的Key-Value數(shù)據(jù)庫。數(shù)據(jù)會放到先內(nèi)存當(dāng)中,在一定條件下觸發(fā)寫到磁盤文件上。
在 checkpoint時(shí), 整個(gè) RocksDB數(shù)據(jù)庫的數(shù)據(jù)會快照一份, 然后存到配置的文件系統(tǒng)中(一般是 hdfs)。同時(shí), Apache Flink將一些最小的元數(shù)據(jù)存儲在 JobManager 的內(nèi)存或者 Zookeeper 中(對于高可用性情況)。RocksD始終配置為執(zhí)行異步快照

2、適用場景
RocksDBStateBackend適用于非常大的狀態(tài),長窗口、大鍵值狀態(tài)的高可用作業(yè)。
RocksDBStateBackend是目前唯一可用于支持有狀態(tài)流處理應(yīng)用程序的增量檢查點(diǎn)

2.4 使用指定的statebackend

方式1:
直接在 conf/flink-conf.yaml 中指定 state.backend 就是默認(rèn)程序的backend。

jobmanager(MemoryStateBackend,默認(rèn)是這個(gè))
filesystem(FsStateBackend)
rocksdb(RocksDBStateBackend)

方式2:
在程序中指定自己想使用的backend

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設(shè)置狀態(tài)后端
env.setStateBackend(xxx);

三種類型的實(shí)現(xiàn)類:
// 默認(rèn)使用內(nèi)存的方式存儲狀態(tài)值, 單位快照的狀態(tài)上限為10MB, 使用同步方式進(jìn)行快照。單個(gè)狀態(tài)大小可以設(shè)置,單位是byte
env.setStateBackend(new MemeoryStateBackend(10*1024*1024, false));

// 使用 FsStateBackend的方式進(jìn)行存儲, 并且是同步方式進(jìn)行快照
env.setStateBackend(new FsStateBackend("hdfs://namenode....", false));

// 使用 RocksDBStateBackend方式存儲, 并采用增量的快照方式進(jìn)行存儲。后面的true表示增量
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode....", true));

當(dāng)前題目:六、flink--容錯(cuò)機(jī)制
當(dāng)前路徑:http://muchs.cn/article6/jehgig.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、品牌網(wǎng)站制作、動(dòng)態(tài)網(wǎng)站、虛擬主機(jī)、、網(wǎng)站排名

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作