ApacheFlinkTask執(zhí)行之?dāng)?shù)據(jù)流如何處理

這篇文章主要介紹Apache Flink Task執(zhí)行之?dāng)?shù)據(jù)流如何處理,文中介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們一定要看完!

站在用戶的角度思考問題,與客戶深入溝通,找到黃陵網(wǎng)站設(shè)計與黃陵網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設(shè)計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:成都網(wǎng)站制作、成都做網(wǎng)站、外貿(mào)營銷網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、主機(jī)域名、網(wǎng)絡(luò)空間、企業(yè)郵箱。業(yè)務(wù)覆蓋黃陵地區(qū)。

獲取流數(shù)據(jù)

用戶提交的代碼最終被封裝成了org.apache.flink.runtime.taskmanager.Task,Task是一個Runnable因此核心代碼就在run方法,run方法調(diào)用了doRun方法,在doRun中調(diào)用了invokable.invoke(),Task的整個處理流程其實就在這里面。org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable是一個抽象類,它的子類是不同類型的Task,這里我們主要關(guān)注流處理任務(wù)相關(guān)的org.apache.flink.streaming.runtime.tasks.StreamTask,StreamTask的invoke方法執(zhí)行了runMailboxLoop()方法。

runMailboxLoop()方法就是執(zhí)行org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor的runMailboxLoop方法。MailboxProcessor是一種線程模型,runMailboxLoop就是在while輪詢中不斷執(zhí)行任務(wù)和默認(rèn)動作,其中默認(rèn)動作就是StreamTask的processInput方法,該方法調(diào)用了StreamInputProcessor的inputProcessor方法,在這個方法中獲取并處理了流數(shù)據(jù)。StreamInputProcessor的子類StreamOneInputProcessor和StreamTwoInputProcessor分別用來處理有1個和2個入度的Task(StreamMultipleInputProcessor先不管)。StreamOneInputProcessor中有1個StreamTaskInput用來獲取數(shù)據(jù),1個DataOutput用來收集從StreamTaskInput獲取的數(shù)據(jù);同理,StreamTwoInputProcessor有2個StreamTaskInput和2個DataOutput。StreamTaskInput的子類StreamTaskNetworkInput用來從網(wǎng)絡(luò)中獲取流數(shù)據(jù),通過調(diào)用他它的emitNext不僅處理流數(shù)據(jù)還處理了checkpoint barrier,本篇文章只關(guān)注數(shù)據(jù)流的處理流程。StreamTaskNetworkInput從反序列化器中獲取到完整流數(shù)據(jù)后把數(shù)據(jù)交給DataOutput。DataOutput也有處理1個入度和2個入度的子類,它們都持有OperatorChain中第一個operator的引用,稱為headOperator,DataOutput從StreamTaskInput那里獲取到數(shù)據(jù)后會交給headOperator來處理。到此為止,流數(shù)據(jù)被獲取并傳入了OperatorChain。 這里總結(jié)一下:StreamTask的processInput方法在MailboxProcessor中被反復(fù)調(diào)用,在processInput方法中StreamTask使用StreamInputProcessor來獲取并處理流數(shù)據(jù)。StreamInputProcessor中的StreamTaskInput用來獲取數(shù)據(jù),獲取的數(shù)據(jù)交給DataOutput,DataOutput將數(shù)據(jù)傳入OperatorChain的第一個operator。其中StreamTask,StreamInputProcessor和DataOutput都有處理1個入度和2個入度的子類。

Apache Flink Task執(zhí)行之?dāng)?shù)據(jù)流如何處理

數(shù)據(jù)流過OperatorChain

OperatorChain的第一個operator獲取數(shù)據(jù)后,數(shù)據(jù)是怎樣在OperatorChain中流動的呢?首先說說OperatorChain,StreamOperatorWrapper是chain的每個節(jié)點,每個節(jié)點都有指向下一個或上一個節(jié)點的引用,因此OperatorChain是一個雙向鏈表。但是數(shù)據(jù)的流動并不依靠這個鏈?zhǔn)浇Y(jié)構(gòu)。上文我們提到DataOutput將數(shù)據(jù)交給了headOperator,OperatorChain的第一個節(jié)點都是StreamOperator的子類,我們編寫的filer算子,map算子等最終都會被封裝成StreamOperator,例如子類StreamFlatMap就是執(zhí)行flatMap方法,StreamFilter就是執(zhí)行fliter方法等。這些方法執(zhí)行的時候用org.apache.flink.streaming.api.operators.Output對處理后的結(jié)果進(jìn)行收集。例如StreamFilter當(dāng)FilterFunction返回true時收集數(shù)據(jù),而StreamFlatMap將Output傳入flatMap方法中由用戶代碼進(jìn)行收集數(shù)據(jù)。收集的數(shù)據(jù)是怎樣向OperatorChain的下一個節(jié)點傳遞的呢?原來Output中持有OneInputStreamOperator變量指向了chain中下一個節(jié)點的算子,調(diào)用Output的collect方法會調(diào)用下一個算子的processElement,數(shù)據(jù)就這樣在整個OperatorChain中傳遞了。

Apache Flink Task執(zhí)行之?dāng)?shù)據(jù)流如何處理

發(fā)向下游Task

當(dāng)數(shù)據(jù)傳到OperatorChain的最后一個算子時數(shù)據(jù)是怎樣發(fā)向下個Task的呢?最后一個算子擁有的Output實現(xiàn)類是org.apache.flink.streaming.runtime.io.RecordWriterOutput。RecordWriterOutput的collect方法會調(diào)用的org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit方法用來發(fā)送數(shù)據(jù),該方法會將序列化器中的數(shù)據(jù)復(fù)制到BufferBuilder中。BufferBuilder維護(hù)了一個內(nèi)存片段MemorySegment并且可以創(chuàng)建相應(yīng)的消費者。RecordWriter有2個實現(xiàn)類ChannelSelectorRecordWriter和BroadcastRecordWriter。Task向下游節(jié)點的多個并行度發(fā)送數(shù)據(jù),每個并行度都對應(yīng)一個channel。ChannelSelectorRecordWriter為每個chanel都保存一個BufferBuilder并分別添加BufferConsumer:

BufferBuilder bufferBuilder = super.requestNewBufferBuilder(targetChannel);//按channel獲取BufferBuilder
addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);//按channel添加BufferConsumer
bufferBuilders[targetChannel] = bufferBuilder;

BroadcastRecordWriter只有一個BufferBuilder,使用同一個BufferBuilder給所有的channel添加BufferConsumer:

try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {
    for (int channel = 0; channel < numberOfChannels; channel++) {
        addBufferConsumer(bufferConsumer.copy(), channel);//所有channel用同一個BufferBuilder達(dá)到廣播的目的
    }
}

RecordWriter#requestNewBufferBuilder方法會獲取BufferBuilder,如果獲取失敗會導(dǎo)致Task執(zhí)行線程阻塞造成反壓。

public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
    BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);//嘗試獲取,獲取不到返回null
    if (builder == null) {
        long start = System.currentTimeMillis();
        builder = targetPartition.getBufferBuilder(targetChannel);//阻塞獲取,導(dǎo)致反壓
        idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
    }
    return builder;
}

BufferBuilder最終來自LocalBufferPool,LocalBufferPool有幾個重要的屬性:

//taskmanager的網(wǎng)絡(luò)緩存池,MemorySegment從這里獲取
private final NetworkBufferPool networkBufferPool;
//已經(jīng)獲取的MemorySegment被組織成一個隊列
private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
//當(dāng)前l(fā)ocalBufferPool的大小
private int currentPoolSize;
//已經(jīng)獲取的MemorySegment
private int numberOfRequestedMemorySegments;
//每個channel能同時獲取的最大BufferBuilder數(shù)
private final int maxBuffersPerChannel;
//subpartition就是channel,數(shù)組存儲了每個channel同時使用的BufferBuilder數(shù)
private final int[] subpartitionBuffersCount;

BufferBuilder由requestMemorySegment方法和requestMemorySegmentBlocking方法獲取,requestMemorySegmentBlocking方法也是調(diào)用requestMemorySegment方法并在沒有獲取到MemorySegment時通過AvailableFuture的get方法來阻塞直到獲取成功為止,AvailableFuture是一個用CompletableFuture表示的狀態(tài)位,這里用到了CompletableFuture的get方法會阻塞直到complete的特性,沒有完成的future表示unavailable,完成了的表示available。requestMemorySegment方法中如果已經(jīng)獲取的MemorySegment(numberOfRequestedMemorySegments)大于了localBufferPool的大小(currentPoolSize)需要將多余的MemorySegment先歸還給networkBufferPool。之后獲取MemorySegment,如果獲取不到就設(shè)置AvailableFuture為不可用,否則記錄channel使用的MemorySegment數(shù)量,如果大于maxBuffersPerChannel,也設(shè)置AvailableFuture為不可用。

@Nullable
private MemorySegment requestMemorySegment(int targetChannel) throws IOException {
    MemorySegment segment = null;
    synchronized (availableMemorySegments) {
        returnExcessMemorySegments();//將多余的segment歸還給networkBufferPool

        if (availableMemorySegments.isEmpty()) {
            segment = requestMemorySegmentFromGlobal();//全局獲取
        }
        // segment may have been released by buffer pool owner
        if (segment == null) {
            segment = availableMemorySegments.poll();//局部獲取
        }
        if (segment == null) {
            availabilityHelper.resetUnavailable();//獲取不到設(shè)置為不可用
        }

        //記錄channel正在使用segment數(shù),如果超了設(shè)置為不可用
        if (segment != null && targetChannel != UNKNOWN_CHANNEL) {
            if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) {
                unavailableSubpartitionsCount++;
                availabilityHelper.resetUnavailable();
            }
        }
    }
    return segment;
}

反壓的采集

上面說的AvailableFuture設(shè)置為不可用其實和反壓有關(guān),Task的isBackPressured方法返回了該Task是否產(chǎn)生了反壓。

public boolean isBackPressured() {
    if (invokable == null || consumableNotifyingPartitionWriters.length == 0 || !isRunning()) {
        return false;
    }
    //獲取所有的AvailableFuture,如果有沒完成了則有反壓
    final CompletableFuture<?>[] outputFutures = new CompletableFuture[consumableNotifyingPartitionWriters.length];
    for (int i = 0; i < outputFutures.length; ++i) {
        outputFutures[i] = consumableNotifyingPartitionWriters[i].getAvailableFuture();
    }
    return !CompletableFuture.allOf(outputFutures).isDone();
}

以上是“Apache Flink Task執(zhí)行之?dāng)?shù)據(jù)流如何處理”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

網(wǎng)站名稱:ApacheFlinkTask執(zhí)行之?dāng)?shù)據(jù)流如何處理
分享鏈接:http://muchs.cn/article46/ipgehg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站導(dǎo)航、定制網(wǎng)站、自適應(yīng)網(wǎng)站、網(wǎng)頁設(shè)計公司、微信小程序、面包屑導(dǎo)航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)