Reactive(2)響應(yīng)式流與制奶廠(chǎng)業(yè)務(wù)-創(chuàng)新互聯(lián)

再談響應(yīng)式

在前一篇文章從Reactive編程到“好萊塢”中,談到了響應(yīng)式的一些概念,講的有些發(fā)散。 但僅僅還是停留在概念的層面,對(duì)于實(shí)戰(zhàn)性的東西并沒(méi)有涉及。
所以大家看了后,或許還是有些不痛不癢。

網(wǎng)站的建設(shè)創(chuàng)新互聯(lián)公司專(zhuān)注網(wǎng)站定制,經(jīng)驗(yàn)豐富,不做模板,主營(yíng)網(wǎng)站定制開(kāi)發(fā).小程序定制開(kāi)發(fā),H5頁(yè)面制作!給你煥然一新的設(shè)計(jì)體驗(yàn)!已為成都發(fā)電機(jī)回收等企業(yè)提供專(zhuān)業(yè)服務(wù)。

響應(yīng)式編程強(qiáng)調(diào)的是異步化、面向流的處理方式,這兩者也并非憑空生出,而是從大量的技術(shù)實(shí)踐中總結(jié)提煉出來(lái)的概念,就比如:

  • 我們談異步化,容易聯(lián)想到 Java 異步IO(Asynchronized IO),而且習(xí)慣于將其和 BIO、NIO等概念來(lái)做對(duì)比。 殊不知,老早出現(xiàn)的 Swing 框架(Java UI)就已經(jīng)將異步化思維玩的很溜了,不信的可以看看其內(nèi)部 Observer模式(觀察者)的實(shí)現(xiàn)。

  • 我們談流式處理,容易聯(lián)想到 時(shí)下當(dāng)紅的 Flink框架。 但幾乎所有的大數(shù)據(jù)分析、批處理應(yīng)用都是基于流式進(jìn)行處理的,比如 ETL,甚至是一個(gè)最簡(jiǎn)單的 Map Reduce 作業(yè)。

為什么Web后端開(kāi)發(fā)的,對(duì) Reactive 沒(méi)有感覺(jué)

Reactive(2) 響應(yīng)式流與制奶廠(chǎng)業(yè)務(wù)

除了前端,Reactive 概念在大數(shù)據(jù)領(lǐng)域的應(yīng)用其實(shí)非常的廣泛了。 但是對(duì)于大多數(shù)做 Web 后端開(kāi)發(fā)的人來(lái)說(shuō)或許普及程度并不高,以筆者自身的感受是,碼了這么些年頭,除了做好代碼分層之外,似乎也沒(méi)有見(jiàn)到 Reactive可以發(fā)揮重大作用的地方。 原因就在于,在Web 后端開(kāi)發(fā)領(lǐng)域基本是依托 HTTP協(xié)議機(jī)制實(shí)現(xiàn)的,這是一個(gè)相當(dāng)簡(jiǎn)單的 請(qǐng)求 -> 應(yīng)答 交互模式,客戶(hù)端在發(fā)送請(qǐng)求后,會(huì)一直等待結(jié)果返回,也就是結(jié)果的通知是由客戶(hù)端主動(dòng)獲取而非異步通知的,因此并不是 Reactive 的風(fēng)格。 但這已經(jīng)是符合用戶(hù)一貫的使用方式了,絕大多數(shù)情況下并不需要做什么樣的變化,此時(shí)我們對(duì)響應(yīng)式的感知并不深刻。

更符合Reactive 的另外一個(gè)場(chǎng)景是 富客戶(hù)端(Rich Application),假設(shè)在需要大量復(fù)雜的前端交互的場(chǎng)景下,我們可以選擇將一些邏輯放在前端代碼中實(shí)現(xiàn)。
此時(shí)的 Web 交互就不再是整個(gè)頁(yè)面的刷新,而是演變?yōu)榭蛻?hù)端與服務(wù)端的"實(shí)時(shí)"雙向通訊,這類(lèi)應(yīng)用也比較普遍了,比如基于 WebSocket 實(shí)現(xiàn)的 在線(xiàn)通信、互動(dòng)應(yīng)用 等等。

淺顯的從趨勢(shì)上看, Reactive 的前景還是很明朗的,這里并不是說(shuō)因?yàn)楝F(xiàn)在多數(shù)流行的編程語(yǔ)言中都有它的影子(比如提供了Rx風(fēng)格的框架)。
而是未來(lái)的大數(shù)據(jù)處理、實(shí)時(shí)流計(jì)算會(huì)成為主流,這是環(huán)境決定的。 而這時(shí) Reactive 這種"面向流"的編程模式無(wú)疑是很合適的。

Java 9 支持的 Reactive Stream

Java 平臺(tái)直到 JDK 9 才提供了對(duì)于 Reactive 的完整支持,而在此之前的JDK版本中,也以及存在一些有關(guān)聯(lián)性的API,比如:

  • Future 和 CompletableFuture接口,用于實(shí)現(xiàn)異步計(jì)算。 后者較前者則是完善了異步結(jié)果通知、任務(wù)串行等特性。
  • Stream 接口,可以將傳統(tǒng)的集合轉(zhuǎn)換為"流"的方式進(jìn)行處理,比如迭代、映射轉(zhuǎn)換。

這些關(guān)聯(lián)性API 并不是完整的 Reactive,Java 9所支持的 Reactive Stream API 來(lái)自于2013年的響應(yīng)式流規(guī)范(Reactive Stream Specification)。

https://www.reactive-streams.org/

基于這個(gè)規(guī)范中主要定義了下面幾個(gè)接口:

Java的響應(yīng)式流接口統(tǒng)一定義在 java.util.concurrent.Flow接口

  • Publisher
    即數(shù)據(jù)的發(fā)布者。 Publisher 接口定義了一個(gè)subscribe方法,用于添加訂閱者:

  • Subscriber
    指數(shù)據(jù)的訂閱者。 Subscriber 接口定義了4個(gè)方法,用于針對(duì)不同的事件作出響應(yīng)。

首先,在subscribe方法調(diào)用成功后,Subscriber的 onSubscribe(Subscription s) 方法會(huì)被觸發(fā)(Subscription 表示當(dāng)前的訂閱關(guān)系)。
此后,正??梢岳^續(xù)調(diào)用 Subscription 的 request(long n) 方法來(lái)向發(fā)布者請(qǐng)求數(shù)據(jù),n是指大的數(shù)據(jù)條目數(shù)。

發(fā)布者會(huì)產(chǎn)生3種不同的消息,分別對(duì)應(yīng)到 Subscriber 的3個(gè)回調(diào)方法:

數(shù)據(jù)消息:對(duì)應(yīng) onNext 方法,表示發(fā)布者產(chǎn)生的數(shù)據(jù)。
錯(cuò)誤消息:對(duì)應(yīng) onError 方法,表示發(fā)布者產(chǎn)生了錯(cuò)誤。
結(jié)束消息:對(duì)應(yīng) onComplete 方法,表示發(fā)布者已經(jīng)完成了所有數(shù)據(jù)的發(fā)布。

在上面的3種通知中,錯(cuò)誤、結(jié)束消息都表示當(dāng)前的流已經(jīng)到達(dá)了終點(diǎn),后面不再會(huì)有消息產(chǎn)生。

  • Subscription
    Subscription 表示的是一個(gè)訂閱關(guān)系。 可以通過(guò)該對(duì)象請(qǐng)求數(shù)據(jù)(request方法),或者取消訂閱(cancel方法)。

  • Processor
    Processor 表示的一種特殊的對(duì)象,既是生產(chǎn)者,又是訂閱者。

負(fù)壓的支持

負(fù)壓是響應(yīng)式流定義的一種重要的能力,在上述的接口中,實(shí)質(zhì)上已經(jīng)提供了負(fù)壓的支持。
Publisher 只有在收到請(qǐng)求之后,才會(huì)產(chǎn)生數(shù)據(jù)。 這就保證了 Subscriber 可以根據(jù)自己的處理能力,確定要向 Publisher 請(qǐng)求的數(shù)據(jù)量,以此保證自身不會(huì)被沖垮。

范例

下面,以一個(gè)簡(jiǎn)單的代碼示例來(lái)演示 Reactive Stream API 是如何使用的。

Reactive(2) 響應(yīng)式流與制奶廠(chǎng)業(yè)務(wù)

以制奶廠(chǎng)為例,為了提高營(yíng)收,工廠(chǎng)推出了一個(gè)廠(chǎng)家直銷(xiāo)的業(yè)務(wù)。 顧客可以直接向廠(chǎng)方訂購(gòu)一定天數(shù)的奶制品,每天則是由工廠(chǎng)的服務(wù)人員送貨上門(mén)。

為了模擬這個(gè)場(chǎng)景,我們實(shí)現(xiàn)的代碼如下:

  1. 制奶廠(chǎng),一個(gè)Publisher實(shí)現(xiàn):
public class MilkFactory extends SubmissionPublisher<String> {

    private final ScheduledFuture<?> periodicTask;
    private final ScheduledExecutorService scheduler;

    private static final List<String> milks = Arrays.asList("益力多", "酸牛奶", "原味奶", "低脂蛋奶", "羊奶", "甜牛奶");

    public MilkFactory() {
        super();
        //初始化定時(shí)器
        scheduler = new ScheduledThreadPoolExecutor(1);

        //每一天生產(chǎn)完牛奶并推送給消費(fèi)者
        periodicTask = scheduler.scheduleAtFixedRate(
                () -> submit(produceMilk()), 0, 1, TimeUnit.SECONDS);
    }

    //隨機(jī)生產(chǎn)牛奶
    private String produceMilk() {
        return milks.get((int) (Math.random() * milks.size()));
    }

    //關(guān)閉流
    public void close() {
        periodicTask.cancel(false);
        scheduler.shutdown();
        super.close();
    }
}

MilkFactory 集成自SubmissionPublisher(一個(gè)提供緩沖的Publisher實(shí)現(xiàn)),其內(nèi)部會(huì)啟動(dòng)一個(gè)定時(shí)器,用于模擬每天給用戶(hù)發(fā)放生產(chǎn)的牛奶。
通過(guò)submit()方法可以將數(shù)據(jù)推送給用戶(hù)。

  1. 顧客,一個(gè)Subscriber實(shí)現(xiàn):
public class MilkCustomer implements Flow.Subscriber<String> {
    private Flow.Subscription subscription;
    private AtomicInteger available = new AtomicInteger(0);
    private int dayCount;

    public MilkCustomer(int dayCount) {
         this.dayCount = dayCount;
    }
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        //設(shè)置總量
        available.set(dayCount);

        //第一天
        subscription.request(1);
    }

    @Override
    public void onNext(String milk) {
        System.out.println("今天的牛奶到了: " + milk);

        //如果還有存量,繼續(xù)請(qǐng)求
        if(available.decrementAndGet() > 0){
            subscription.request(1);
        }else{
            System.out.println("牛奶套餐已經(jīng)派完,歡迎繼續(xù)訂購(gòu)");
            this.subscription.cancel();
        }
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("closed.");
    }
}

MilkCustomer 接受一個(gè)dayCount入?yún)?,即表示訂?gòu)的數(shù)量,在首次訂閱時(shí)會(huì)請(qǐng)求第一天的奶品,此后則每次收到到奶品后再請(qǐng)求下一天的,直到將總量消費(fèi)完。

  1. 測(cè)試程序

執(zhí)行下面的代碼:

MilkFactory factory = new MilkFactory();

//訂閱1周
MilkCustomer customer = new MilkCustomer(7);

factory.subscribe(customer);

輸出:

今天的牛奶到了: 酸牛奶
今天的牛奶到了: 羊奶
今天的牛奶到了: 原味奶
牛奶套餐已經(jīng)派完,歡迎繼續(xù)訂購(gòu)

小結(jié)

在上例中,我們使用 Java 提供的 Reactive Stream API 實(shí)現(xiàn)了一個(gè)"在線(xiàn)送奶" 的業(yè)務(wù)流。
整個(gè)過(guò)程相對(duì)是比較簡(jiǎn)單的,最關(guān)鍵的地方就在于對(duì)流式處理以及訂閱關(guān)系的理解。 然而目前的 Reactive 實(shí)現(xiàn)還沒(méi)有完全的統(tǒng)一,比如 Spring WebFlux(SpringBoot 2支持) 仍然是基于 Reactor 私有API而不是 Reactive Stream API 來(lái)構(gòu)建的,后面有機(jī)會(huì)再做下介紹。

擴(kuò)展閱讀

關(guān)于Future和CompletableFuture的區(qū)別
https://juejin.im/post/5adbf8226fb9a07aac240a67

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線(xiàn),公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性?xún)r(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專(zhuān)為企業(yè)上云打造定制,能夠滿(mǎn)足用戶(hù)豐富、多元化的應(yīng)用場(chǎng)景需求。

分享名稱(chēng):Reactive(2)響應(yīng)式流與制奶廠(chǎng)業(yè)務(wù)-創(chuàng)新互聯(lián)
當(dāng)前地址:http://muchs.cn/article42/eishc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、虛擬主機(jī)、外貿(mào)建站、營(yíng)銷(xiāo)型網(wǎng)站建設(shè)、企業(yè)建站App開(kāi)發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

網(wǎng)站建設(shè)網(wǎng)站維護(hù)公司