大數(shù)據(jù)(9g)FlinkCEP-創(chuàng)新互聯(lián)

文章目錄
  • 概述
  • 示例代碼
    • 環(huán)境和依賴
    • Java代碼
      • 上面代碼可改成下面

堅守“ 做人真誠 · 做事靠譜 · 口碑至上 · 高效敬業(yè) ”的價值觀,專業(yè)網(wǎng)站建設(shè)服務(wù)10余年為成都成都墻體彩繪小微創(chuàng)業(yè)公司專業(yè)提供企業(yè)網(wǎng)站建設(shè)營銷網(wǎng)站建設(shè)商城網(wǎng)站建設(shè)手機網(wǎng)站建設(shè)小程序網(wǎng)站建設(shè)網(wǎng)站改版,從內(nèi)容策劃、視覺設(shè)計、底層架構(gòu)、網(wǎng)頁布局、功能開發(fā)迭代于一體的高端網(wǎng)站建設(shè)服務(wù)。
概述
  • CEP
    Complex Event Processing:復合事件處理
    通過分析事件間的關(guān)系,從事件流中查詢出符合要求的事件序列
  • 例如【切菜=>洗菜=>炒菜】3個事件按時間序串聯(lián),是正常的事件流
    當發(fā)現(xiàn)【切菜=>炒菜】忽略洗菜的事件流,可認為是異常事件
示例代碼 環(huán)境和依賴

WIN10+JDK1.8+IDEA2021+Maven3.6.3
CEP額外依賴為flink-cep

881.14.62.121.18.24org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-clients_${scala.binary.version}${flink.version}org.apache.flinkflink-runtime-web_${scala.binary.version}${flink.version}org.apache.flinkflink-cep_${scala.binary.version}${flink.version}
Java代碼

監(jiān)測 嚴格近鄰的連續(xù)三次a的事件流

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CepPractice {public static void main(String[] args) throws Exception {//創(chuàng)建環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //添加數(shù)據(jù)源,確定水位線策略
        SingleOutputStreamOperatord = env.fromElements("c", "a", "a", "a", "a", "b", "a", "a")
                .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
                        .withTimestampAssigner((element, recordTimestamp) ->1L));
        //定義模式
        Patternp = Pattern
                .begin("first")
                .where(new SimpleCondition() {@Override
                    public boolean filter(String value) {return value.equals("a");
                    }
                })
                .next("second")
                .where(new SimpleCondition() {@Override
                    public boolean filter(String value) {return value.equals("a");
                    }
                })
                .next("third")
                .where(new SimpleCondition() {@Override
                    public boolean filter(String value) {return value.equals("a");
                    }
                });
        //在流上匹配模型
        PatternStreampatternStream = CEP.pattern(d, p);
        //使用select方法將匹配到的事件流取出
        patternStream.select((PatternSelectFunction) map ->{//Map的key是事件名稱(上面的first、second和third)
            //Map的key對應(yīng)的value是列表,儲存匹配到的事件
            String first = map.get("first").toString();
            String second = map.get("second").toString();
            String third = map.get("third").toString();
            return first + "->" + second + "->" + third;
        }).print();
        //執(zhí)行
        env.execute();
    }
}

打印結(jié)果

[a]->[a]->[a]
[a]->[a]->[a]
上面代碼可改成下面

留意.times(3).consecutive()

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;

public class CepPractice2 {public static void main(String[] args) throws Exception {//創(chuàng)建環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //添加數(shù)據(jù)源,確定水位線策略
        SingleOutputStreamOperator>d = env.fromElements(
                Tuple2.of("a", 1000L), Tuple2.of("a", 2000L), Tuple2.of("a", 3000L),
                Tuple2.of("a", 4000L), Tuple2.of("b", 5000L), Tuple2.of("a", 6000L))
                .assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps()
                        .withTimestampAssigner((element, recordTimestamp) ->element.f1));
        //定義模式
        Pattern, Tuple2>p = Pattern
                .>begin("=a")
                .where(new SimpleCondition>() {@Override
                    public boolean filter(Tuple2value) {return value.f0.equals("a");
                    }
                })
                .times(3)
                .consecutive(); //嚴格連續(xù)
        //在流上匹配模型
        PatternStream>patternStream = CEP.pattern(d, p);
        //使用select方法將匹配到的事件流取出
        patternStream.select((PatternSelectFunction, String>) map ->{//Map的key是事件名稱(上面的first、second和third)
            //Map的key對應(yīng)的value是列表,儲存匹配到的事件
            List>ls = map.get("=a");
            String first = ls.get(0).f0;
            String second = ls.get(1).f0;
            String third = ls.get(2).f0;
            return String.join("=>", first, second, third);
        }).print();
        //執(zhí)行
        env.execute();
    }
}

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

分享標題:大數(shù)據(jù)(9g)FlinkCEP-創(chuàng)新互聯(lián)
本文URL:http://www.muchs.cn/article40/dgicho.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站、做網(wǎng)站、網(wǎng)站制作、服務(wù)器托管網(wǎng)站排名、網(wǎng)站內(nèi)鏈

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁設(shè)計