第五章FlinkComplexEventProcessing復(fù)雜事件處理

   轉(zhuǎn)載需標(biāo)明出處                      mythmoon@163.com

成都創(chuàng)新互聯(lián)公司是專業(yè)的深澤網(wǎng)站建設(shè)公司,深澤接單;提供網(wǎng)站設(shè)計制作、網(wǎng)站設(shè)計,網(wǎng)頁設(shè)計,網(wǎng)站設(shè)計,建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行深澤網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊,希望更多企業(yè)前來合作!

 Complex Event Processing  復(fù)雜事件處理   

In the previous chapter, we talked about the Table API provided by Apache Flink and how we can use it to process relational data structures. This chapter onwards, we will start learning more about the libraries provided by Apache Flink and how we can use them for specific use cases. To start with, let’s try to understand a library called Comrlex Event Processing(CEP). CEP is a very interesting but complex topic that has its value in various industries. Wherever there is a stream of events expected, naturally people want to perform complex event processing in all such use cases. Let’s try to understand what CEP is all about. 在前一章中, 我們討論了Apache Flink 提供的表 api, 以及如何使用它來處理關(guān)系數(shù)據(jù)結(jié)構(gòu)。本章之后, 我們將開始了解有關(guān) apacheflink 提供的庫的更多信息, 以及如何將它們用于特定的用例。首先, 讓我們嘗試了解一個名為 Comrlex Event Processing(CEP) 的庫。cep 是一個非常有趣但復(fù)雜的話題, 在各個行業(yè)都有其價值。只要有預(yù)期的事件流, 人們自然希望在所有此類用例中執(zhí)行復(fù)雜的事件處理。讓我們試著了解 cep 的意義。

 

What is complex event processing? 什么是復(fù)雜事件處理?

CEP analyzes streams of disparate events occurring with high frequency and low latency. These days, streaming events can be found in various industries, for example: cep 分析以高頻和低延遲發(fā)生的不同事件流。如今, 流媒體事件可以在不同的行業(yè)中找到, 例如:

 

第五章 Flink Complex Event Processing  復(fù)雜事件處理In the oil and gas domain, sensor data comes from various drilling tools or from upstream oil pipeline equipment 在石油和天然氣領(lǐng)域, 傳感器數(shù)據(jù)來自各種鉆井工具或上游石油管道設(shè)備

第五章 Flink Complex Event Processing  復(fù)雜事件處理In the security domain, activity data, malware information, and usage pattern data come from various end points 在安全域中, 活動數(shù)據(jù)、惡意軟件信息和使用模式數(shù)據(jù)來自不同的端點

第五章 Flink Complex Event Processing  復(fù)雜事件處理In the wearable domain, data comes from various wrist bands with information about your heart beat rate, your activity, and so on 在可穿戴領(lǐng)域, 數(shù)據(jù)來自不同的腕帶, 其中包含有關(guān)您的心跳率、活動等信息

第五章 Flink Complex Event Processing  復(fù)雜事件處理In the banking domain, data comes from credit card usage, banking activities, and so on  在銀行領(lǐng)域, 數(shù)據(jù)來自信用卡使用、銀行活動等


 

 

It is very important to analyze variation patterns to get notified in real time about any change in the regular assembly. CEP can understand patterns across the streams of events, sub-events, and their sequences. CEP helps to identify meaningful patterns and complex relationships among unrelated events, and sends notifications in real and near real time to prevent damage: 分析變體模式以實時獲得有關(guān)常規(guī)程序集中任何更改的通知是非常重要的。cep 可以了解跨事件流、子事件及其序列的模式。cep 有助于識別不相關(guān)事件之間有意義的模式和復(fù)雜關(guān)系, 并實時和近實時發(fā)送通知, 以防止損壞:

 

第五章 Flink Complex Event Processing  復(fù)雜事件處理

 

The preceding diagram shows how the CEP flow works. Even though the flow looks simple, CEP has various abilities such as: 上圖顯示了 cep 流的工作原理。盡管流看起來很簡單, cep 也有各種能力, 例如:

第五章 Flink Complex Event Processing  復(fù)雜事件處理The ability to produce results as soon as the input event stream is available在輸入事件流可用時生成結(jié)果的能力

第五章 Flink Complex Event Processing  復(fù)雜事件處理The ability to provide computations such as aggregation over time and timeout between two events of interest提供計算 (如隨時間的聚合和兩個感興趣的事件之間的超時) 的能力

第五章 Flink Complex Event Processing  復(fù)雜事件處理The ability to provide real-timeInear real-time alerts and notifications on detection of complex event patterns能夠提供實時輸入實時警報和通知, 用于檢測復(fù)雜事件模式

第五章 Flink Complex Event Processing  復(fù)雜事件處理The ability to connect and correlate heterogeneous sources and analyze patterns in them連接異構(gòu)源并將其關(guān)聯(lián)并分析其中模式的能力

第五章 Flink Complex Event Processing  復(fù)雜事件處理The ability to achieve high-throughput, low-latency processing實現(xiàn)高吞吐量、低延遲處理的能力

 

There are various solutions available on the market. With big data technology advancements, we have multiple options like Apache Spark, Apache Samza, Apache Beam, among others, but none of them have a dedicated library to fit all solutions. Now let us try to understand what we can achieve with Flink’s CEP library.

市場上有各種各樣的解決方案。隨著大數(shù)據(jù)技術(shù)的進(jìn)步, 我們有多種選擇, 如 apache spark, apache samza, apache beam 等, 但沒有一個專用的庫, 以適應(yīng)所有的解決方案?,F(xiàn)在, 讓我們嘗試了解 flink 的 cep 庫可以實現(xiàn)什么。


 

 

Flink CEP

Apache Flink provides the Flink CEP library, which provides APIs to perform complex event processing. The library consists of the following core components: apache flink 提供 flink cep 庫, 該庫提供用于執(zhí)行復(fù)雜事件處理的 api。該庫由以下核心組件組成:

Event stream

Pattern definition  模式定義

Pattern detection  模式檢測

Alert generation警告生成

 

 

 

第五章 Flink Complex Event Processing  復(fù)雜事件處理

 

Flink CEP works on Flink’s streaming API called DataStream. A programmer needs to define the pattern to be detected from the stream of events and then Flink’s CEP engine detects the pattern and takes the appropriate action, such as generating alerts. flink cep 適用于 flink 名為 datastream 的流媒體 api。程序員需要定義要從事件流中檢測到的模式, 然后 flink 的 cep 引擎檢測到該模式并采取適當(dāng)?shù)牟僮? 例如生成警報。

In order to get started, we need to add the following Maven dependency:

<!--

https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.10 -

->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-cep-scala_2.11</artifactId>

<version>1.1.4</version>

</dependency>


 

 

Event streams

A very important component of CEP is its input event stream. In earlier chapters, we have seen details of the DataStream API. Now let’s use that knowledge to implement CEP. The very first thing we need to do is define a Java POJO for the event. Let’s assume we need to monitor a temperature sensor event stream. CEP的一個非常重要的組成部分是它的輸入事件流。在前面的章節(jié)中, 我們看到了DataStream API的詳細(xì)信息?,F(xiàn)在, 讓我們使用這些知識來實現(xiàn)CEP。我們需要做的第一件事就是為事件定義一個 Java POJO。假設(shè)我們需要監(jiān)視溫度傳感器事件流。

First we define an abstract class and then extend this class.

 

第五章 Flink Complex Event Processing  復(fù)雜事件處理

 

The following code snippets demonstrate this. First, we write an abstract class as shown here:

package com.demo.chapter05;

 

public abstract class MonitoringEvent { private String machineName;

public String getMachineName() { return machineName;

}

 

public void setMachineName(String machineName) { this.machineName = machineName;

}

 

@Override

public int hashCode() { final int prime = 31; int result = 1;

result = prime * result + ((machineName == null)  0 : machineName.hashCode());

return result;

}

 

@Override

public boolean equals(Object obj) { if (this == obj)

return true; if (obj == null)

return false;


 

 

if (getClass() != obj.getClass()) return false;

MonitoringEvent other = (MonitoringEvent) obj; if (machineName == null) {

if (other.machineName != null) return false;

} else if (!machineName.equals(other.machineName)) return false;

return true;

}

 

public MonitoringEvent(String machineName) { super();

this.machineName = machineName;

}

 

}

 

Then we create a POJO for the actual temperature event:

package com.demo.chapter05;

public class TemperatureEvent extends MonitoringEvent { public TemperatureEvent(String machineName) {

super(machineName);

}

private double temperature; public double getTemperature() {

return temperature;

}

 

public void setTemperature(double temperature) { this.temperature = temperature;

}

 

@Override

public int hashCode() { final int prime = 31;

int result = super.hashCode(); long temp;

temp = Double.doubleToLongBits(temperature);

result = prime * result + (int) (temp ^ (temp >>> 32)); return result;

}


 

 

@Override

public boolean equals(Object obj) { if (this == obj)

return true;

if (!super.equals(obj)) return false;

if (getClass() != obj.getClass()) return false;

TemperatureEvent other = (TemperatureEvent) obj; if (Double.doubleToLongBits(temperature) !=

Double.doubleToLongBits(other.temperature)) return false;

return true;

}

 

public TemperatureEvent(String machineName, double temperature) { super(machineName);

this.temperature = temperature;

}

 

@Override

public String toString() {

return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName()=" + getMachineName()

+ "]";

}

 

}

 

Now we can define the event source as follows: In Java:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),

new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2),

new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),

new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz",


 

22.7),


new TemperatureEvent("xyz", 27.0));


 

 

In Scala:

val env: StreamExecutionEnvironment =

StreamExecutionEnvironment.getExecutionEnvironment

val input: DataStream[TemperatureEvent] = env.fromElements(new TemperatureEvent("xyz", 22.0),

new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1),

new TemperatureEvent("xyz", 22.2),

new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3),

new TemperatureEvent("xyz", 22.1),

new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),

new TemperatureEvent("xyz", 27.0))

 

 

Pattern API

The Pattern API allows you to define complex event patterns very easily. Each pattern consists of multiple states. To go from one state to another state, generally we need to define the conditions. The conditions could be continuity or filtered out events. Pattern API 允許您非常輕松地定義復(fù)雜的事件模式。每個模式由多個狀態(tài)組成。要從一種狀態(tài)到另一個狀態(tài), 一般我們需要定義條件。條件可以是連續(xù)性, 也可以是篩選出事件。

 

第五章 Flink Complex Event Processing  復(fù)雜事件處理

 

Let’s try to understand each pattern operation in detail.


 

 

Begin

The initial state can be defined as follows: In Java:

Pattern<Event, > start = Pattern.<Event>begin("start");

 

In Scala:

val start : Pattern[Event, _] = Pattern.begin("start")

 

 

Filter

We can also specify the filter condition for the initial state: In Java:

start.where(new FilterFunction<Event>() {

@Override

public boolean filter(Event value) { return ... // condition

}

});

 

In Scala:

 

start.where(event => ... /* condition */)

 

 

Subtype

We can also filter out events based on their sub-types, using the subtype()method: In Java:

start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {

@Override

public boolean filter(SubEvent value) { return ... // condition

}

});

 

In Scala:

 

start.subtype(classOf[SubEvent]).where(subEvent => ... /* condition */)


 

 

OR

Pattern API also allows us define multiple conditions together. We can use ORand AND

operators. In Java:

pattern.where(new FilterFunction<Event>() {

@Override

public boolean filter(Event value) { return ... // condition

}

}).or(new FilterFunction<Event>() {

@Override

public boolean filter(Event value) { return ... // or condition

}

});

 

In Scala:

 

pattern.where(event => ... /* condition */).or(event => ... /* or condition

*/)

 

 

Continuity

As stated earlier, we do not always need to filter out events. There can always be some pattern where we need continuity instead of filters.

Continuity can be of two types –strict continuity and non-strict continuity.

如前所述, 我們并不總是需要篩選出事件??偸强梢杂幸恍┠J? 我們需要連續(xù)性, 而不是過濾器。連續(xù)性可以是兩種類型-嚴(yán)格的連續(xù)性和不嚴(yán)格的連續(xù)性。

Strict continuity

Strict continuity needs two events to succeed directly which means there should be no other event in between. This pattern can be defined by next().嚴(yán)格的連續(xù)性需要兩個事件直接成功, 這意味著兩者之間不應(yīng)該有其他事件。此模式可以由下一個  定義。

In Java:

Pattern<Event, > strictNext = start.next("middle");

 

In Scala:

val strictNext: Pattern[Event, _] = start.next("middle")


 

 

Non-strict continuity

Non-strict continuity can be stated as other events are allowed to be in between the specific two events. This pattern can be defined by followedBy().非嚴(yán)格的連續(xù)性可以說是, 因為允許在特定的兩個事件之間出現(xiàn)其他事件。此模式可以通過以下定義 ()。

In Java:

Pattern<Event, > nonStrictNext = start.followedBy("middle");

 

In Scala:

val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")

 

 

Within

Pattern API also allows us to do pattern matching based on time intervals. We can define a time-based temporal constraint as follows. Pattern API 還允許我們根據(jù)時間間隔進(jìn)行模式匹配。我們可以定義一個基于時間的時間約束, 如下所示。

In Java:

next.within(Time.seconds(30));

 

In Scala:

next.within(Time.seconds(10))

 

 

Detecting patterns

To detect patterns against a stream of events, we need to run the stream though the pattern. TheCEP.pattern()returnsPatternStream. 要針對事件流檢測模式, 我們需要通過模式運行該流。cep 模式 () 返回 patternstream。

The following code snippet shows how we can detect a pattern. First the pattern is defined to check if the temperature value is greater than 26.0degrees in 10seconds. 下面的代碼段演示了如何檢測模式。首先定義模式是為了在10秒內(nèi)檢查溫度值是否大于22.0度。

In Java:

Pattern<TemperatureEvent, > warningPattern = Pattern.<TemperatureEvent> begin("first")

.subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {

public boolean filter(TemperatureEvent value) { if (value.getTemperature() >= 26.0) {

return true;

}


 

 

return false;

}

}).within(Time.seconds(10));

 

PatternStream<TemperatureEvent> patternStream = CEP.pattern(inputEventStream, warningPattern);

In Scala:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

 

val input = // data

 

val pattern: Pattern[TempEvent, _] = Pattern.begin("start").where(event => event.temp >= 26.0)

val patternStream: PatternStream[TempEvent] = CEP.pattern(input, pattern)

 

 

Selecting from patterns

Once the pattern stream is available, we need to select the pattern from it and then take appropriate actions based on it. We can use the selector flatSelectmethod to select data from the pattern. 一旦模式流可用, 我們需要從中選擇模式, 然后根據(jù)它采取適當(dāng)?shù)男袆?。我們可以使用選擇或平面選擇方法從模式中選擇數(shù)據(jù)。

 

Select

The select method needs PatternSelectionFunctionimplementation. It has a select method which would be called for each event sequence. The selectmethod receives a map of stringIevent pairs of matched events. The string is defined by the name of the state. The selectmethod returns exactly one result. 選擇方法需要模式選擇函數(shù)實現(xiàn)。它有一個選擇方法, 將為每個事件序列調(diào)用該方法。選擇方法接收匹配事件的字符串事件對的映射。字符串由狀態(tài)的名稱定義。選擇方法只返回一個結(jié)果。

To collect the results, we need to define the output POJO. In our case, let’s say we need to generate alerts as output. Then we need to define POJO as follows: 為了收集結(jié)果, 我們需要定義輸出 pojo。在我們的例子中, 假設(shè)我們需要生成警報作為輸出。然后, 我們需要定義 pojo, 如下所示:

 

package com.demo.chapter05; public class Alert {

private String message;

 

public String getMessage() { return message;

}


 

 

public void setMessage(String message) { this.message = message;

}

 

public Alert(String message) { super();

this.message = message;

}

 

@Override

public String toString() {

return "Alert [message=" + message + "]";

}

 

@Override

public int hashCode() { final int prime = 31; int result = 1;

result = prime * result + ((message == null)  0 : message.hashCode());

return result;

}

 

@Override

public boolean equals(Object obj) { if (this == obj)

return true; if (obj == null)

return false;

if (getClass() != obj.getClass()) return false;

Alert other = (Alert) obj; if (message == null) {

if (other.message != null) return false;

} else if (!message.equals(other.message)) return false;

return true;

}

 

}


 

 

Next we define the select functions. In Java:

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {

@Override

public OUT select(Map<String, IN> pattern) { IN startEvent = pattern.get("start"); IN endEvent = pattern.get("end"); return new OUT(startEvent, endEvent);

}

}

 

In Scala:

 

def selectFn(pattern : mutable.Map[String, IN]): OUT = { val startEvent = pattern.get("start").get

val endEvent = pattern.get("end").get OUT(startEvent, endEvent)

}

 

 

flatSelect

TheflatSelectmethod is similar to theselectmethod. The only difference between the two is thatflatSelectcan return an arbitrary number of results. TheflatSelectmethod has an additionalCollectorparameter which is used for output element. 平板選擇方法類似于選擇方法。兩者之間的唯一區(qū)別是, 平面選擇可以返回任意數(shù)量的結(jié)果。平面選擇方法有一個額外的收集器參數(shù), 用于輸出元素。

The following example shows how we can use the flatSelectmethod. In Java:

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {

@Override

public void select(Map<String, IN> pattern, Collector<OUT> collector) { IN startEvent = pattern.get("start");

IN endEvent = pattern.get("end");

 

for (int i = 0; i < startEvent.getValue(); i++ ) { collector.collect(new OUT(startEvent, endEvent));

}

}

}


 

 

In Scala:

def flatSelectFn(pattern : mutable.Map[String, IN], collector : Collector[OUT]) = {

val startEvent = pattern.get("start").get val endEvent = pattern.get("end").get for (i <- 0 to startEvent.getValue) {

collector.collect(OUT(startEvent, endEvent))

}

}

 

 

Handling timed-out partial patterns

Sometimes we may miss out certain events if we have constrained the patterns with a time boundary. It is possible that events may be discarded because they exceed the length. In order to take actions on the timed out events, the selectand flatSelectmethods allow a timeout handler. This handler is called for each timeout event pattern. 有時, 如果我們限制了具有時間邊界的模式, 我們可能會錯過某些事件。事件可能會因為超過長度而被丟棄。為了對超時事件執(zhí)行操作, select 和平面 select 方法允許超時處理程序。為每個超時事件模式調(diào)用此處理程序。

In this case, the select method contains two parameters:PatternSelectFunctionandPatternTimeoutFunction. The return type for a timed out function can be different from the select pattern function. The timed out event and select event are wrapped in the classEither.RightandEither.Left.

The following code snippets shows how we do things in practice. In Java:

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

 

DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(

new PatternTimeoutFunction<Event, TimeoutEvent>() {...}, new PatternSelectFunction<Event, ComplexEvent>() {...}

);

 

DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSelect(

new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...}, new PatternFlatSelectFunction<Event, ComplexEvent>() {...}

);


 

 

In Scala, the select API:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

 

DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{

(pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()

} {

pattern: mutable.Map[String, Event] => ComplexEvent()

}

 

TheflatSelectAPI is called with theCollectoras it can emit an arbitrary number of events:

 

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

 

DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{

(pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) =>

out.collect(TimeoutEvent())

} {

(pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) => out.collect(ComplexEvent())

}

 

 

Use case - complex event processing on a temperature sensor

In earlier sections, we learnt about various features provided by the Flink CEP engine. Now it’s time to understand how we can use it in real-world solutions. For that, let’s assume we work for a mechanical company which produces some products. In the product factory, there is a need to constantly monitor certain machines. The factory has already set up the sensors which keep on sending the temperature of the machines at a given time. 在前面的部分中, 我們了解了 flink cep 引擎提供的各種功能?,F(xiàn)在×××解我們?nèi)绾卧诂F(xiàn)實世界的解決方案中使用它了。為此, 讓我們假設(shè)我們在一家生產(chǎn)一些產(chǎn)品的機(jī)械公司工作。在產(chǎn)品工廠, 需要不斷監(jiān)控某些機(jī)器。工廠已經(jīng)安裝了傳感器, 在給定的時間繼續(xù)發(fā)送機(jī)器的溫度。

 

Now we will be setting up a system that constantly monitors the temperature value and generates an alert if the temperature exceeds a certain value.

現(xiàn)在我們將設(shè)置一個系統(tǒng), 不斷監(jiān)測溫度值, 并在溫度超過某個值的情況下生成警報。


 

 

We can use the following architecture:

 

第五章 Flink Complex Event Processing  復(fù)雜事件處理

 

Here we will be using Kafka to collect events from sensors. In order to write a Java application, we first need to create a Maven project and add the following dependency:

 

<!--

https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.11 -

->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-cep-scala_2.11</artifactId>

<version>1.1.4</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink- streaming-java_2.11 -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_2.11</artifactId>

<version>1.1.4</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink- streaming-scala_2.11 -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-scala_2.11</artifactId>

<version>1.1.4</version>

</dependency>

<dependency>


 

 

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.9_2.11</artifactId>

<version>1.1.4</version>

</dependency>

 

Next we need to do following things for using Kafka.

First we need to define a custom Kafka deserializer. This will read bytes from a Kafka topic and convert it into TemperatureEvent. The following is the code to do this.

EventDeserializationSchema.java:

package com.demo.chapter05;

 

import java.io.IOException;

import java.nio.charset.StandardCharsets;

 

import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor;

import org.apache.flink.streaming.util.serialization.DeserializationSchema;

 

public class EventDeserializationSchema implements DeserializationSchema<TemperatureEvent> {

 

public TypeInformation<TemperatureEvent> getProducedType() { return TypeExtractor.getForClass(TemperatureEvent.class);

}

 

public TemperatureEvent deserialize(byte[] arg0) throws IOException { String str = new String(arg0, StandardCharsets.UTF_8);

 

String[] parts = str.split("=");

return new TemperatureEvent(parts[0], Double.parseDouble(parts[1]));

}

 

public boolean isEndOfStream(TemperatureEvent arg0) { return false;

}

 

}

 

Next we create topics in Kafka calledtemperature:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication- factor 1 --partitions 1 --topic temperature


 

 

Now we move to Java code which would listen to these events in Flink streams: 現(xiàn)在, 我們轉(zhuǎn)到 java 代碼, 它將偵聽 flink 流中的這些事件:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test");

 

DataStream<TemperatureEvent> inputEventStream = env.addSource(

new FlinkKafkaConsumer09<TemperatureEvent>("temperature", new EventDeserializationSchema(), properties));

Next we will define the pattern to check if the temperature is greater than 26.0degrees Celsius within 10seconds:

 

Pattern<TemperatureEvent, > warningPattern = Pattern.<TemperatureEvent> begin("first").subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {

private static final long serialVersionUID = 1L;

 

public boolean filter(TemperatureEvent value) { if (value.getTemperature() >= 26.0) {

return true;

}

return false;

}

}).within(Time.seconds(10));

 

Next match this pattern with the stream of events and select the event. We will also add up the alert messages into results stream as shown here:

 

DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)

.select(new PatternSelectFunction<TemperatureEvent, Alert>() { private static final long serialVersionUID = 1L;

 

public Alert select(Map<String, TemperatureEvent> event) throws Exception {

 

return new Alert("Temperature Rise Detected:" + event.get("first").getTemperature()

+ " on machine name:" + event.get("first").getMachineName());

}

 

});


 

 

In order to know what the alerts were generated, we will print the results:

patternStream.print();

 

And we execute the stream:

env.execute("CEP on Temperature Sensor");

 

Now we are all set to execute the application. As and when we get messages in Kafka topics, the CEP will keep on executing.

 

The actual execution will looks like the following. Here is how we can provide sample input:

 

xyz=21.0 xyz=30.0 LogShaft=29.3 Boiler=23.1 Boiler=24.2 Boiler=27.0 Boiler=29.0

Here is how the sample output will look like:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1010488393]

10/09/2016

18:15:55

Job execution switched to status RUNNING.

10/09/2016

18:15:55

Source: Custom Source(1/4)   switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(1/4)   switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(2/4)   switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(2/4)   switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(3/4)   switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(3/4)   switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(4/4)   switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(4/4)   switched to DEPLOYING

10/09/2016

18:15:55

CEPPatternOperator(1/1)   switched to SCHEDULED

10/09/2016

18:15:55

CEPPatternOperator(1/1)   switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(1/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink:   Unnamed(1/4) switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(2/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink:   Unnamed(2/4) switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(3/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink:   Unnamed(3/4) switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(4/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink:   Unnamed(4/4) switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(2/4)   switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source(3/4)   switched to RUNNING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(1/4) switched to RUNNING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(2/4) switched to RUNNING


 

10/09/2016

18:15:55

Map -> Sink:   Unnamed(3/4) switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source(4/4)   switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source(1/4)   switched to RUNNING

10/09/2016

18:15:55

CEPPatternOperator(1/1)   switched to RUNNING

10/09/2016

18:15:55

Map   -> Sink: Unnamed(4/4) switched to RUNNING

1> Alert [message=Temperature Rise Detected:30.0 on machine name:xyz]

2> Alert [message=Temperature Rise Detected:29.3 on machine name:LogShaft] 3> Alert [message=Temperature Rise Detected:27.0 on machine name:Boiler] 4> Alert [message=Temperature Rise Detected:29.0 on machine name:Boiler]

 

We can also configure a mail client and use some external web hook to send e-mail or messenger notifications.

 

第五章 Flink Complex Event Processing  復(fù)雜事件處理

 

 

Summary 小結(jié)

In this chapter, we learnt about CEP. We discussed the challenges involved and how we can use the Flink CEP library to solve CEP problems. We also learnt about Pattern API and the various operators we can use to define the pattern. In the final section, we tried to connect the dots and see one complete use case. With some changes, this setup can be used as it is present in various other domains as well. 在本章中, 我們了解了 cep。我們討論了所涉及的挑戰(zhàn)以及如何使用 flink cep 庫來解決 cep 問題。我們還了解了模式 api 和各種運算符, 我們可以使用來定義模式。在最后一節(jié)中, 我們嘗試連接點, 并看到一個完整的用例。通過一些更改, 此設(shè)置也可以在其他各種域中使用。

In the next chapter, we will see how to use Flink’s built-in Machine Learning library to solve complex problems.

文章題目:第五章FlinkComplexEventProcessing復(fù)雜事件處理
瀏覽地址:http://muchs.cn/article4/gedjie.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名、網(wǎng)站制作、響應(yīng)式網(wǎng)站、品牌網(wǎng)站建設(shè)、網(wǎng)站策劃微信小程序

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化