flink中相關(guān)的知識(shí)點(diǎn)有哪些

本篇內(nèi)容主要講解“flink中相關(guān)的知識(shí)點(diǎn)有哪些”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“flink中相關(guān)的知識(shí)點(diǎn)有哪些”吧!

10年積累的成都網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問(wèn)題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先制作網(wǎng)站后付款的網(wǎng)站建設(shè)流程,更有臨滄免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。

1.OperatorChain

1.1 OperatorChain的優(yōu)點(diǎn)

1.1.1 減少線程切換 1,1.2 減少序列化與反序列化 1.1.3 減少數(shù)據(jù)在緩沖區(qū)的交換 1.1.4 減少延遲并且提高吞吐能力

1.2 OperatorChain組成條件

1.2.1 沒(méi)有禁用Chain 1.2.2 上下游算子并行度一致 1.2.3 下游算子的入度為1(也就是說(shuō)下游節(jié)點(diǎn)沒(méi)有其他節(jié)點(diǎn)的輸入) 1.2.4 上下游算子在同一個(gè)slot group 1.2.5 下游節(jié)點(diǎn)的chain策略為always(可以與上下游鏈接,map、flatmap、filter等默認(rèn)是always) 1.2.6 上有節(jié)點(diǎn)的chain策略為always或head(只能與下游鏈接,不能與上有鏈接,source默認(rèn)是head) 1.2.7 上下游算子之間沒(méi)有數(shù)據(jù)shuffle(數(shù)據(jù)分區(qū)方式是forward)

1.3 禁用OperatorChain幾種方式

1.3.1 DataStream的算子操作后調(diào)用startNewChain算子 1.3.2 DataStream調(diào)用disableChaining來(lái)關(guān)閉Chain 1.3.3 StreamExecutionEnvironment.getExecutionEnvironment.disableOperatorChaining() 全局關(guān)閉 1.3.4 DataStream.slotSharingGroup("name") 設(shè)置新的slotgrop名稱 1.3.5 改變并行度

2.slot

2.1 slot與parallelism的關(guān)系

默認(rèn)task slot數(shù)與join中task的最高并行度一致

2.2 共享slot

2.2.1 flink集群需要的任務(wù)槽與作業(yè)中使用的最高并行度正好相同(前提,保持默認(rèn)SlotSharingGroup)。也就是說(shuō)我們不需要再去 計(jì)算一個(gè)程序總共會(huì)起多少個(gè)task了 2.2.2 適當(dāng)設(shè)置slotSharingGroup可以減少每個(gè)slot運(yùn)行的線程數(shù),從而整體上減少機(jī)器的負(fù)載

3.累加器和計(jì)數(shù)器

3.1 計(jì)數(shù)器是最簡(jiǎn)單的累加器 3.2 內(nèi)置累加器有IntCounter,LongCounter,DoubleCounter 3.3 Histogram 柱狀圖

4.控制延遲

默認(rèn)情況下,流中的元素并不會(huì)一個(gè)一個(gè)的在網(wǎng)絡(luò)中傳輸(這會(huì)導(dǎo)致不必要的網(wǎng)絡(luò)流量消耗) ,而是緩存起來(lái),緩存的大小可以在Flink的配置文件、ExecutinEnvironment、設(shè)置某個(gè)算子 進(jìn)行配置(默認(rèn)100ms)這樣控制的 好處:提高吞吐 壞處:增加了延遲

如何把握平衡: (1)為了最大吞吐量,可以設(shè)置setBufferTimeout(-1),這會(huì)移出timeout機(jī)制,緩存中的數(shù)據(jù) 一滿就會(huì)被發(fā)送,不建議用,假如一條信息4 5個(gè)小時(shí)才來(lái)這時(shí)候延遲會(huì)非常高,會(huì)等整個(gè)buffer滿了再處理 (2)為了最小延遲,可以將超時(shí)設(shè)置為接近0的數(shù)(例如5或者10ms) (3)緩存的超時(shí)不要設(shè)置0,因?yàn)闀?huì)帶來(lái)一些性能的損耗

5.min minby max maxby

min和minby的區(qū)別是min返回一個(gè)最小值,而minby返回的是其字段中包含的最小元素

6.interval join

在給定周期內(nèi),按照指定key對(duì)兩個(gè)KeyedStream進(jìn)行join操作,把符合join條件的兩個(gè)event拉倒一起,然后怎么處理由用戶自己定義 場(chǎng)景:把一定時(shí)間內(nèi)的相關(guān)的分組數(shù)據(jù)拉成一個(gè)寬表

7.connect 和union

connect之后是connectedStreams,會(huì)對(duì)兩個(gè)流的數(shù)據(jù)應(yīng)用不同的處理方法,并且雙流之間可以 共享狀態(tài)(比如計(jì)數(shù))。這在第一個(gè)流的輸入會(huì)影響第二個(gè)流時(shí)會(huì)非常有用。 union合并多個(gè)流,新的流包含所有流的數(shù)據(jù) union是DataStream->DataStream connect只能連接兩個(gè)流,而union可以連接多余兩個(gè)流 connect兩個(gè)流類型可以不一致,而union連接的流類型必須一致

8.算子之間傳遞數(shù)據(jù)的方式

(1)One-to-one streams 保持元素的分區(qū)和順序 (2)重新分區(qū)的方式 ,重新分區(qū)策略取決于使用的算子 keyby、broadcast、rebalance

dataStream.shuffle() 按均勻分布隨機(jī)劃分元素,網(wǎng)絡(luò)開(kāi)銷往往比較大 dataStream.rebalance() 循環(huán)對(duì)元素進(jìn)行分區(qū),為每各分區(qū)創(chuàng)建相等負(fù)載,解決數(shù)據(jù)傾斜時(shí)非常有用 dataStream.rescale() 跟rebalance類似,但不是全局的,通過(guò)輪詢調(diào)度將元素從上游的task一個(gè)子集發(fā)送到下游task的一個(gè)子集 dataStream.broadcast() 將元素廣播到每個(gè)分區(qū)上

9.flink三個(gè)時(shí)間的比較

9.1 EventTime

9.1.1 事件生成的時(shí)間,在進(jìn)入Flink之氣就存在,可以從event的字段中抽取 9.1.2 必須指定watermarks的生產(chǎn)方式 9.1.3 優(yōu)勢(shì):確定性,亂序、延時(shí)、或者數(shù)據(jù)重放等情況,都能給出正確結(jié)果 9.1.4 弱點(diǎn):處理無(wú)序事件時(shí)性能和延遲受到影響

9.2 IngerstTime

9.2.1 事件進(jìn)入flink的時(shí)間,即source里獲取的當(dāng)前系統(tǒng)時(shí)間,后續(xù)統(tǒng)一使用該時(shí)間 9.2.2 不需要指定watermarks的生產(chǎn)范式(自動(dòng)生成) 9.2.3 弱點(diǎn):不能處理無(wú)序事件和延遲數(shù)據(jù)

9.3 ProcessingTime

9.3.1 執(zhí)行操作的機(jī)器的當(dāng)前系統(tǒng)時(shí)間(每個(gè)算子都不一樣) 9.3.2 不需要流和機(jī)器之間的協(xié)調(diào) 9.3.3 優(yōu)勢(shì):最佳的性能和最低的延遲 9.3.4 弱點(diǎn):不確定性,容易受到各種因素影響(event產(chǎn)生的速度、到達(dá)flink的速度、算子之間傳輸速度),壓根不管順序和延遲

9.4 比較

性能:ProcessingTime>IngestTime>EventTime 延遲:ProcessingTime<IngestTime<EventTime 確定性:EventTime>IngestTime>ProcessIngTime

不設(shè)置time類型,默認(rèn)是processingTime 通過(guò) env.setStreamTimeCharacteristic()方法設(shè)置time類型

10.watermark

10.1 說(shuō)明

10.1.1 通常情況下,watermark在source函數(shù)中生成,但也可以在source后任何階段,如果指定多次 后面指定的會(huì)覆蓋前面的值。source的每個(gè)sub task獨(dú)立生成水位線。 10.1.2 watermark通過(guò)操作時(shí)會(huì)推進(jìn)算子操作時(shí)的event time,同時(shí)會(huì)為下游生成一個(gè)新的watermark 10.1.3 多輸入operator(union、keyby、partition)的當(dāng)前event time是其輸入流event time最小值

10.2 兩種watermark

10.2.1 周期性 watermark

(1)基于時(shí)間 (2)ExecutionConfig.setAutoWatermarkInterval(msec) (默認(rèn)200ms,設(shè)置watermarker發(fā)生的周期) (3)實(shí)現(xiàn)AssignerWithPeriodicWatermarks接口

10.2.2 間斷的watermark

(1)基于某些時(shí)間出發(fā)watermark的生產(chǎn)和發(fā)送(由用戶代碼實(shí)現(xiàn),例如遇到特殊情況) (2)實(shí)現(xiàn)AssignerWithPeriodicWatermarks接口

11 處理延遲數(shù)據(jù)

方式一:allowedLateness(),設(shè)定最大延遲時(shí)間,觸發(fā)被延遲,不宜設(shè)置太大 方式二:sideOutputTag,提供了延遲數(shù)據(jù)獲取的一種方式,這樣就不會(huì)丟棄數(shù)據(jù)了,延遲數(shù)據(jù)單獨(dú)處理。

同時(shí)側(cè)輸出流也是進(jìn)行分流的一種方式,比如一個(gè)流可以分成多個(gè)不同的流sink到不同的目標(biāo)端。

12 窗口

12.1 窗口的類型

12.1.1 Keyed Windows(在已經(jīng)安裝keyby分組的基礎(chǔ)上(KeyedStream),再構(gòu)建多任務(wù)并行window) stream.keyBy().window() 12.1.2 Non-Keyed Windwos(在未分組的DataStream上構(gòu)建單任務(wù)Window,并行度是1,API都帶ALL后綴) stream.windowAll()

12.2 窗口的生命周期

創(chuàng)建:當(dāng)屬于第一個(gè)元素到達(dá)時(shí)就會(huì)創(chuàng)建該窗口 銷毀:當(dāng)時(shí)間(event/process time)超過(guò)窗口的結(jié)束時(shí)間戳+用戶指定的延遲時(shí)(allowedLateness<time>),窗口將會(huì)移除

13 觸發(fā)器與驅(qū)逐器

13.1 觸發(fā)器

觸發(fā)器決定了一個(gè)窗口何時(shí)可以被窗口函數(shù)處理(條件滿足時(shí)觸發(fā)并發(fā)出信號(hào)) 每一個(gè)WindowAssigner都有一個(gè)默認(rèn)的觸發(fā)器,如果默認(rèn)觸發(fā)器不滿足需要可以通過(guò)trigger()來(lái)指定

觸發(fā)器有5個(gè)方法來(lái)允許觸發(fā)器處理不同的事件(trigger) onElement()方法每個(gè)元素被添加到窗口是調(diào)用 onEvenTime() 當(dāng)一個(gè)已注冊(cè)的事件時(shí)間計(jì)時(shí)器啟動(dòng)時(shí)調(diào)用 onProcessingTime 當(dāng)一個(gè)已注冊(cè)的處理時(shí)間計(jì)時(shí)器啟動(dòng)時(shí)調(diào)用 onMerge 與狀態(tài)觸發(fā)器相關(guān), 當(dāng)使用session window時(shí)兩個(gè)觸發(fā)器對(duì)應(yīng)的窗口合并,合并兩個(gè)觸發(fā)器的狀態(tài) clear相應(yīng)窗口被清除時(shí)觸發(fā)

13.2 驅(qū)逐器

evictor是可選的,WindowAssigner默認(rèn)沒(méi)有evictor evictor能夠在Trigger觸發(fā)之后以及在應(yīng)用窗口函數(shù)執(zhí)行前和/或后從窗口中刪除無(wú)用的元素,類似filter作用 evictBefore在窗口之前應(yīng)用 evictAfter在窗口后應(yīng)用

14 如何允許延遲

14.1 當(dāng)處理event-time的windwo時(shí),可能會(huì)出現(xiàn)元素晚到的情況,即flink用來(lái)跟蹤event-time進(jìn)度的 watermark已經(jīng)過(guò)了元素所屬窗口的最后時(shí)間,屬于當(dāng)前窗口的數(shù)據(jù)才到達(dá)) 14.2 默認(rèn)情況下,當(dāng)watermark已經(jīng)過(guò)了窗口的最后時(shí)間時(shí),晚到的元素會(huì)被丟棄 14.3 Flink允許為窗口操作指定一個(gè)最大允許延時(shí)時(shí)長(zhǎng),Allowed lateness指定,默認(rèn)情況是0 14.4 水位線已過(guò)了窗口最后時(shí)間才來(lái)的元素,如果還在未到窗口最后時(shí)間加延遲時(shí)間,任然可以在窗口中計(jì)算

特例:在使用GlobalWindows(全局window),不會(huì)考慮延遲,因?yàn)榇翱诘慕Y(jié)束時(shí)間戳是Long.MAX_VALUE

15 state狀態(tài)

Flink的狀態(tài):一般指一個(gè)具體的task/operator某時(shí)刻在內(nèi)存中的的狀態(tài)(例如某屬性的值) 注意:State和checkpointing不要搞混 checkpoint 則表示了一個(gè)flink job ,在一個(gè)特定時(shí)一份全局狀態(tài)快照,即包含了一個(gè)job下所有task/operator某時(shí)刻的狀態(tài)

15.1 狀態(tài)的錯(cuò)用

15.1.1 增量計(jì)算 a)聚合操作 b)機(jī)器學(xué)習(xí)訓(xùn)練模型迭代運(yùn)算時(shí)保持當(dāng)前模型 15.1.2 容錯(cuò) a)job故障重啟 b)flink程序升級(jí)

15.2 狀態(tài)的分類

15.2.1 Operator State 每個(gè)流普通的Operator的狀態(tài) 15.2.2 Keyed State Keyed Streaming的狀態(tài) 15.2.3 特殊的:Broadcast State(1.5開(kāi)始)

Keyed State支持的數(shù)據(jù)結(jié)構(gòu) (1)ValueState (2)ListState (3)ReducingState (4)AggregatingState (5)FoldingState (6)MapState

注意: (1)狀態(tài)不一定存儲(chǔ)在內(nèi)部,可能駐留在磁盤(pán)或其他地方 (2)狀態(tài)是使用RunntimContext方法的,因此只能在Rich函數(shù)中訪問(wèn)

16 checkpoint狀態(tài)容錯(cuò)

有了狀態(tài)自然需要狀態(tài)容錯(cuò),否則就失去意義了,flink狀態(tài)容錯(cuò)機(jī)制就是checkpoint checkpoint是通過(guò)分布式snapshot實(shí)現(xiàn)的,沒(méi)有特殊聲明時(shí)snapshot和checkpoint和back-up是一個(gè)意思

16.1 特點(diǎn)

(1)異步 (2)全量和增量都可以設(shè)置 (3)Barrier機(jī)制 (4)失敗情況下可回滾到最近成功一次的checkpoint (5)周期性

16.2 使用checkpoint前置條件

(1)在一定時(shí)間內(nèi)可回溯的datasource 例如:kafka、rabiitma、hdfs (2)可持久化存儲(chǔ)state的存儲(chǔ)系統(tǒng),通常使用分布式文件系統(tǒng),一般是hdfs,s3,nfs

checkmode:一般選擇EXACTLY_ONCE,除非場(chǎng)景要求極低會(huì)選擇AT_LEAST_ONCE(幾毫秒)

16.3 checkpoint高級(jí)選項(xiàng)值保留策略

默認(rèn)情況下檢查點(diǎn)不會(huì)被保留,僅用于從故障中恢復(fù)作業(yè)??梢詥⒂猛獠砍志没瘷z查點(diǎn),同時(shí)指定保留策略 checkpointConfg.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) (1)CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION 在作業(yè)被取消時(shí)保留檢查點(diǎn)。這種情況取消后必須手動(dòng)清除檢查點(diǎn) (2)CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION 在作業(yè)被取消(cancel)時(shí)會(huì)刪除檢查點(diǎn),等于不啟用。

setCheckpointTimeout 設(shè)置超時(shí)時(shí)間,超過(guò)時(shí)間沒(méi)有完成checkpoint則被終止 setMinPauseBetweenCheckpoints 最小間隔,上一個(gè)checkpoint完成最少等待多久發(fā)出下一個(gè)checkpoint請(qǐng)求 setMaxConcurrentCheckpoints 指定運(yùn)行中多少并行度進(jìn)行checkpoint

16.4 使用checkpoint第二步 選擇合適的State Backed

16.4.1 默認(rèn)State保存在taskmanager的內(nèi)存中 16.4.2 checkpoint機(jī)制會(huì)持久化所有狀態(tài)的一致性快照 快照保存由State Backend來(lái)決定,目前flink自帶三個(gè)State Backed: (1)MemoryStateBackend(默認(rèn)) (2)FsStateBackend (3)RocksDBStateBackend

16.5 MemoryStateBackend

16.5.1 MemoryStateBackend是一個(gè)內(nèi)部狀態(tài)backend,用于維護(hù)Java堆上的狀態(tài)。Key/value狀態(tài)和窗口運(yùn)算符包含存儲(chǔ)值和計(jì)時(shí)器的哈希表 16.5.2 Checkpoint時(shí),MemoryStateBackend會(huì)對(duì)state做一次快照,并像jobManager發(fā)送checkpoint確認(rèn)完成的消息中帶上此快照數(shù)據(jù),然后快照會(huì)存儲(chǔ)在JobManager的堆內(nèi)存中 16.5.3 MemoeyStateBackend默認(rèn)開(kāi)啟異步方式進(jìn)行快照,推薦使用異步避免阻塞。如果要阻塞可以傳false,如下 val memoryStateBackend:StateBackend=new MemoryStateBackend(1010241024,false) env.setStateBackend(memoryStateBackend) 16.5.4 限制:?jiǎn)蝹€(gè)state默認(rèn)5mb,可以在MemoryStateBackend的構(gòu)造函數(shù)指定。不論如何設(shè)置,State大小無(wú)法大于akka.framesize(JobManager和TaskManager之間發(fā)送的最大消息的大小默認(rèn)10mb)。Job Manager必須有足夠內(nèi)存 16.5.5 適用場(chǎng)景:本地開(kāi)發(fā)和測(cè)試 小狀態(tài)job,如只使用Map FlatMap Fliter或Kaka Consumer

16.6 FsStateBackend

16.6.1 FsStateBackend需要配置一個(gè)文件系統(tǒng)URL來(lái),如hdfs://namenode:8080/flink/checkpoint 16.6.2 FsStateBackend在TaskManager的內(nèi)存中持有正在處理的數(shù)據(jù)。checkpoint時(shí)將state snapshot寫(xiě)入文件系統(tǒng)目錄下的文件中。 16.6.3 FsStateBackend默認(rèn)開(kāi)啟異步方式進(jìn)行快照,構(gòu)造方法如下 val stateBackend:StateBackend=new FsStateBackend("hdfs://namenode:9000/flink/checkpoint",false) env.setStateBackend(stateBackend) 16.6.4 適用場(chǎng)景:大狀態(tài)、長(zhǎng)窗口、大鍵/值狀態(tài)的job

16.7、RocksDBStateBackend

16.7.1 RocksDBStateBackend需要配置一個(gè)文件系統(tǒng)的URL。如hdfs://namenode:8080/flink/checkpoint 16.7.2 RocksDBStateBackend運(yùn)行中的數(shù)據(jù)保存在RockDB數(shù)據(jù)庫(kù)中,默認(rèn)情況下存儲(chǔ)在TaskManager數(shù)據(jù)目錄中。 在Checkpoint時(shí),整個(gè)RocksDB數(shù)據(jù)庫(kù)將被checkpointed到配置的文件系統(tǒng)和目錄中 16.7.3 RocksDBSateBackend 始終是異步 16.7.4 RocksDB JNI API是基于Byte[],因此key和value最大支持2^31個(gè)字節(jié)(2GB) 16.7.5 適用場(chǎng)景:超大窗口,超大狀態(tài),大鍵/值狀態(tài)的job 16.7.6 只有RockDBStateBackend支持增量checkpoint 16.7.7 狀態(tài)保存在數(shù)據(jù)塊中,只受可用磁盤(pán)空間量限制,但開(kāi)銷更大(讀/寫(xiě)需要反序列化與序列化),吞吐收到限制 使用需要導(dǎo)包:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
    val stateBackend:StateBackend=new RocksDBStateBackend("hdfs://namenode:9000/flink/checkpoint",true)
    env.setStateBackend(stateBackend)

配置重啟策略 Flink支持不同的重啟策略,這些策略控制在出現(xiàn)故障時(shí)如何重新啟動(dòng)job env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))) (1)如果沒(méi)用啟動(dòng)checkpoint,則使用無(wú)重啟方案 (2)如果啟用了checkpoint,但是沒(méi)有配重啟方案,則使用固定延遲策略,嘗試次數(shù)是Integer.MAX_VALUE

到此,相信大家對(duì)“flink中相關(guān)的知識(shí)點(diǎn)有哪些”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

本文標(biāo)題:flink中相關(guān)的知識(shí)點(diǎn)有哪些
轉(zhuǎn)載來(lái)于:http://muchs.cn/article32/gciopc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站微信小程序、網(wǎng)站設(shè)計(jì)公司、定制網(wǎng)站、面包屑導(dǎo)航網(wǎng)站收錄

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

小程序開(kāi)發(fā)