怎么使用ApachePulsarFunctions進(jìn)行簡單事件處理

這篇文章的內(nèi)容主要圍繞怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理進(jìn)行講述,文章內(nèi)容清晰易懂,條理清晰,非常適合新手學(xué)習(xí),值得大家去閱讀。感興趣的朋友可以跟隨小編一起閱讀吧。希望大家通過這篇文章有所收獲!

專注于為中小企業(yè)提供網(wǎng)站建設(shè)、成都網(wǎng)站制作服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)啟東免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上千多家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

 基于事件的編程 

事件驅(qū)動(dòng)架構(gòu)(Event-driven Architecture,EDA)的關(guān)鍵特征是事件的核心重要性。在 EDA 中,事件 consumer 遵循基于事件編程(EBP)的編程樣式對事件到達(dá)作出反應(yīng)。與面向批處理或過程式編程不同,在 EBP 中,軟件系統(tǒng)響應(yīng)于接收一個(gè)或多個(gè)事件通知并執(zhí)行處理,且完全通過事件以異步方式與其他軟件組件通信。

盡管所有基于事件的應(yīng)用程序不盡相同,但通常都遵循下圖中的結(jié)構(gòu),事件 producer 將事件引入中間的事件處理框架,該框架負(fù)責(zé)持久化事件并將事件交付給事件 consumer。

怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理

圖 1 基于事件的架構(gòu)

中間的事件處理框架除了負(fù)責(zé)事件路由外,還托管事件處理器組件。事件處理器獲取事件,還可以轉(zhuǎn)發(fā)或發(fā)布新事件,因此在某種意義上它們既是事件 consumer 也是事件 producer。但是我們不會(huì)將這些事件處理器稱為事件 producer 或事件 consumer,因?yàn)槲覀兿M麑⑺鼈兣c事件處理框架之外的實(shí)體區(qū)分開來。

 事件處理器類型

在 EDA 中,事件處理器通常可分為以下幾類:

  • 簡單事件處理器:事件到達(dá)立即觸發(fā)事件處理器中的操作。一般來說,如果這些處理器是無狀態(tài)的,則僅根據(jù)當(dāng)前事件的內(nèi)容執(zhí)行所有邏輯;如果是有狀態(tài)的,則可以跨調(diào)用保留消息,以便執(zhí)行稍微復(fù)雜一點(diǎn)的邏輯。

  • 復(fù)雜事件處理器:此類事件處理器處理一系列事件,并執(zhí)行更為復(fù)雜的模式分析,以識別有意義的模式或關(guān)系,如檢測事件相關(guān)性、因果關(guān)系或定時(shí)。典型用例一般用于電子商務(wù)、欺詐檢測、網(wǎng)絡(luò)安全、金融交易和其他需要立即響應(yīng)的環(huán)境中。

 事件處理網(wǎng)絡(luò)

基于事件的應(yīng)用程度通常由許多按特定順序或流排列的事件處理器組成。我們將事件 producer、事件處理器、事件 consumer 的集合稱為事件處理網(wǎng)絡(luò)。事件處理網(wǎng)絡(luò)用于解決一個(gè)或多個(gè)特定的業(yè)務(wù)問題。

怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理

圖 2 事件處理網(wǎng)絡(luò)

如圖 2 所示,外部事件 producer 和事件 consumer 處于邊緣,中間是多個(gè)事件處理器。圖 2 展示了事件處理器之間的事件流,這些箭頭也稱為隱式通道,用于將事件直接從一個(gè)事件處理器推到另一個(gè)事件處理器。當(dāng)使用 Apache Pulsar 實(shí)現(xiàn)時(shí),topic 就是這些隱式通道。

圖 2 中還展示了另一種事件處理器之間通信的方式:共享狀態(tài)管理。事件處理器一般需要保留多個(gè)事件之間的計(jì)算狀態(tài),因此事件處理架構(gòu)需要提供一種機(jī)制以持久化狀態(tài)信息,并允許事件處理器直接訪問。共享狀態(tài)提供了另一種在事件處理器之間共享信息的機(jī)制,并支持有狀態(tài)事件處理,我們將在下一部分詳述相關(guān)內(nèi)容。

多個(gè)基于事件的應(yīng)用程序可以與單個(gè)事件類型相關(guān)聯(lián)。上圖展示了基于事件的應(yīng)用程序(藍(lán)色)到另一個(gè)應(yīng)用程序的過程。在將第一個(gè)應(yīng)用程序發(fā)送到事件 consumer 2 前,同時(shí)將其輸出到事件 consumer 1 和另一個(gè)事件處理器,以進(jìn)行進(jìn)一步處理。

將基于事件的應(yīng)用程序鏈接在一起的原因有很多,如在某一場景中,需要監(jiān)視物聯(lián)網(wǎng)傳感器讀取模式或異常,同時(shí)也希望將這些事件長期存儲(存儲平臺如 HDFS 或 Amazon S3),以便用于訓(xùn)練數(shù)據(jù)模型。

一級事件處理器序列首先進(jìn)行事件的 ETL-類型處理,即將事件轉(zhuǎn)換為可消費(fèi)的格式。這些記錄將被發(fā)送到事件 consumer 1,在本例中即 HDFS。同時(shí),我們還希望將清理后的事件轉(zhuǎn)發(fā)到實(shí)現(xiàn)異常檢測工作流的二級事件處理器序列。我們將在下一部分討論如何使用 Apache Pulsar Functions 作為框架,此框架采用簡單編程邏輯 functions 來實(shí)現(xiàn)基于事件的處理。

 使用 Apache Pulsar Functions 進(jìn)行基于事件的編程

Apache Pulsar Functions 提供了一個(gè)易于使用的框架,開發(fā)者可以使用 Functions 創(chuàng)建或部署處理邏輯,這些處理邏輯由 Apache Pulsar 執(zhí)行。你可以用 Java 或 Python 編寫簡單或復(fù)雜的 function,并將這些 function 部署到 Pulsar 集群中,而無需運(yùn)行單獨(dú)的流處理引擎。Pulsar Functions 是輕量級計(jì)算框架,具有以下特點(diǎn):

  • 在消息發(fā)送至指定 input topic 時(shí)執(zhí)行。

  • 將用戶自定義的處理邏輯應(yīng)用于每條消息。

  • 將計(jì)算結(jié)果發(fā)布到一個(gè)或多個(gè) topic。

怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理

圖 3 Pulsar Functions 編程模型

可以使用 Java 和 Python 編寫 Pulsar Functions,編寫方式有兩種:

  • 使用原生語言接口 ,不需要 Pulsar 特定的庫或特殊依賴。例如,要在 Java 中實(shí)現(xiàn)一個(gè) Pulsar Function,只需要編寫一個(gè)實(shí)現(xiàn) java.util.Function 接口的類,如下所示:

import java.util.Function;public class EchoFunction implements Function<String, String> {    public String apply(String input) {        // Logic Here    }}
  • 使用 Pulsar Functions SDK ,利用特定的 Pulsar 庫,這些庫提供原生接口中無法提供的一系列功能,如 org.apache.pulsar.functions.api.Context 對象提供的狀態(tài)管理功能。

import org.apache.pulsar.functions.api.Context;    import org.apache.pulsar.functions.api.Function;
   public interface Function<I, O> {        O process(I input, Context context) throws Exception;    }

原生語言方法提供了一種清晰的、無 API 的 Pulsar Functions 編寫方法,非常適合無狀態(tài)事件處理器的開發(fā)。但是,這種方法不支持訪問先前的狀態(tài)信息。

 部署 Apache Pulsar Functions



編譯并測試 Pulsar Functions 后,需要將 Pulsar Functions 部署到 Pulsar 集群。Pulsar Functions 旨在支持多種部署場景。目前,運(yùn)行 Pulsar Functions 的方式有兩種:

  • 本地運(yùn)行模式:在此模式下運(yùn)行時(shí),Pulsar Function 將在執(zhí)行命令的機(jī)器上運(yùn)行(如筆記本電腦、邊緣節(jié)點(diǎn)等)。下面是一個(gè)本地運(yùn)行命令的例子:

$ bin/pulsar-admin functions localrun \    —py myfunc.py \    —className myfunc.SomeFunction    —inputs input-topic-1    —outputs output-topic-1
  • 集群模式: 在集群模式下運(yùn)行時(shí),Pulsar Function 代碼將被上傳到 Pulsar 集群中的 broker 中,并與 broker 一起運(yùn)行,而不是在本地環(huán)境中運(yùn)行??梢允褂萌缦滤镜拿钤诩耗J街袆?chuàng)建 function,該命令在 Pulsar broker 節(jié)點(diǎn)上執(zhí)行。

$ bin/pulsar-admin functions create \    —jar target/my-functions.jar \    —className org.example.functions.MyFunction \    —inputs input-topic-1 \    —outputs output-topic-1 \    —parallelism 4 \    —cpu 2 \    —ram 8589934592 \    —disk 10737418240

上面的命令將啟用 4 個(gè) org.example.functions.MyFunction 實(shí)例,每個(gè)實(shí)例有 2 個(gè) CPU 內(nèi)核、8 GB RAM、10 GB 磁盤空間。(注意,需以字節(jié)為單位設(shè)置 RAM 和磁盤,且在 Docker 環(huán)境中必須設(shè)置 CPU 和磁盤。)

還有一種方法可以在創(chuàng)建 Pulsar Function 時(shí)提供用戶配置屬性,此方法在需要復(fù)用 function 時(shí)非常有用。我們通過為 userConfig 屬性指定一個(gè) JSON 字符串,在下面的命令中傳入一組鍵-值對。在運(yùn)行時(shí),可以通過使用 Pulsar Functions SDK 的 Pulsar Functions Context 對象訪問傳入的值,我們將在下一部分詳述相關(guān)內(nèi)容。

$ bin/pulsar-admin functions create \    —jar target/my-functions.jar \    —className org.example.functions.MyFunction \    —inputs input-topic-1 \    —outputs output-topic-1 \    —parallelism 4 \    —cpu 2 \    —ram 8589934592 \    —disk 10737418240 \    —userConfig ‘{“key-1”: “value-1”, “key-2”, “value-2”}’

 使用 Apache Pulsar Functions SDK 的最佳實(shí)踐


Java 和 Python SDK 中定義的 Context 對象為 function 提供了各種各樣的信息和功能,包括保留可用于提供有狀態(tài)事件處理的中間結(jié)果的能力。以下示例是 Context 對象中所包含的信息:

  • Pulsar Function 的名稱和 ID

  • 每條消息的消息 ID。自動(dòng)為每條 Pulsar 消息分配一個(gè) ID。

  • 發(fā)送消息的 topic 的名稱

  • 與 function 相關(guān)聯(lián)的所有 input topic、output topic 的名稱

  • SerDe 的類名稱

  • 與 function 相關(guān)聯(lián)的租戶和命名空間

  • 運(yùn)行 function 的 Pulsar Functions 實(shí)例的 ID

  • Function 的版本

  • Function 使用的 logger 對象,可用于創(chuàng)建 function 日志消息

  • 訪問通過 CLI 提供的任意用戶配置值

  • 記錄 metric 的接口

接下來,我們將介紹一些利用了 Context 對象特性的使用模式。

最佳實(shí)踐 1:動(dòng)態(tài)配置

運(yùn)行或更新使用 SDK 創(chuàng)建的 Pulsar Functions 時(shí),可以使用 -userConfig flag 通過命令行傳入任意鍵/值。鍵/值必須指定為 JSON。以下示例創(chuàng)建 function 并傳入用戶鍵/值。

$ bin/pulsar-admin functions create \    —name word-filter \    —userConfig ‘{“filter”, “$.Sensors{?(@.Type==‘Temp’)]”}’ \     # Other function configs

這個(gè)特性允許我們編寫可以多次使用的通用 function,但是配置略有不同。例如,假設(shè)你想編寫一個(gè)基于 JSON 路徑表達(dá)式過濾 JSON 事件的 function。當(dāng)事件到達(dá)時(shí),將其內(nèi)容與配置的表達(dá)式進(jìn)行比較,并過濾掉不匹配的 entry。

顯然該 function 的行為完全依賴于它所過濾的 JSON 路徑表達(dá)式。為了可以多次使用 function,我們使用 Pulsar SDK,直到部署 function 后再指定此路徑表達(dá)式。

如上例所示,要使用的 JSON 路徑過濾器的值在編譯時(shí)未知,需使用 getUserConfigValueOrDefault 方法從 Context 中獲取。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import com.jayway.jsonpath.JsonPath;
public JsonPathFilterFunction implements Function<String, String> {
   String process(String input, Context context) throws Exception {        // Get the filter from the context        String filter = context.getUserConfigValueOrDefault(“filter”, “$”)                          .toString();        Object filtered = JsonPath.read(input, filter);        Return filtered.toString();    }}

最佳實(shí)踐 2:有狀態(tài)事件處理器

有狀態(tài)事件處理器使用先前事件的內(nèi)存生成輸出。存儲狀態(tài)的能力是處理多個(gè)事件的關(guān)鍵構(gòu)件。在 Apache Pulsar Function 框架中,狀態(tài)信息存儲在基于 Apache BookKeeper 的專用鍵-值存儲中。Pulsar SDK 通過 Context 對象訪問狀態(tài)信息。

怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理

圖 4 Apache Pulsar 狀態(tài)管理

我們來舉例解釋一下狀態(tài) agent。假設(shè)有一個(gè)應(yīng)用程序,用于從物聯(lián)網(wǎng)傳感器獲取溫度讀取事件,我們想知道傳感器的平均溫度,則可以使用事件處理 agent 通過以下 function 持續(xù)更新溫度平均值:

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;
public AvgTempFunction implements Function<Float, Float> {
   Float process(Float currentTemp, Context context) {        // Increment and get counter        context.incrCounter(“num-measurements”);        Integer n = context.getCounter(“num-measurements”);        // Calculate new average based on old average and count        Float old_average = context.getState(“avg-temp”);        Float new_average = (old_average * (n-1) + currentTemp) / n;        context.putState(“avg-temp”, new_average);        return new_average;    }}

最佳實(shí)踐 3:Void Funtions


Pulsar Functions 可以將結(jié)果發(fā)布到一個(gè)或多個(gè) output topic,但可以不發(fā)布結(jié)果。也可以使用 function 僅生成日志,并將結(jié)果寫入外部數(shù)據(jù)庫,或僅用于監(jiān)視流中的異常。以下示例中的 function 只會(huì)將接收到的事件存入日志:

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import org.slf4j.Logger;
public LogFunction implements Function<String, Void> {
   Void process(String input, Context context) throws Exception {        Logger LOG = context.getLogger();        LOG.info("Received {}”, input);        return null;    }}
     

     
在使用輸出類型為 Void 的 Java function 時(shí),function 必須始終返回 null。在不想生成輸出事件時(shí),輸出類型沒有 Void 的 function 可以返回 null,例如,當(dāng)你在使用過濾器,但不希望某一事件被處理時(shí)。      

     
最佳實(shí)踐 4:處理來自多個(gè) input topic 的事件      

     
如圖 3 所示,Pulsar Functions 可以消費(fèi)多個(gè) topic 中的事件,下面我們來看一下如何編寫一個(gè)這樣的 function:      
import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;
public MultiTopicFunction implements Function<String, String> {
   String process(String input, Context context) throws Exception {        String sourceTopic = context.getSourceTopic();        if (sourceTopic.equals(“TopicA”) {           // parse as TopicA Object        } else if (sourceTopic.equals(“TopicB”) {           // parse as Topic B Object        } else if (sourceTopic.equals(“TopicC”) {           // parse as Topic C Object        }        ….    }}
     

從代碼中可以看出,我們首先要從 Context 對象獲取 input topic 的名稱,然后根據(jù) input topic 的名稱相應(yīng)地解析/處理事件。


     

最佳實(shí)踐 5:Metric 收集


     

Apache Pulsar SDK 提供了 metric 收集機(jī)制,可用于記錄所選擇的任何用戶定義的 metric。在下面的示例中,我們使用單獨(dú)的 metric 來跟蹤調(diào)用該 function 的總次數(shù),使用另一個(gè) metric 來跟蹤使用無效輸入調(diào)用該 function 的次數(shù)。更多關(guān)于讀取和使用 metric 的說明,請參閱監(jiān)控指南。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;
public MetricFunction implements Function<Integer, Void> {
   Void process(String input, Context context) throws Exception {        context.recordMetric(“invocation count”, 1);        if (input < 0) {           context.recordMetric(“Invalid data”, 1);        }        return null;    }}
     

感謝你的閱讀,相信你對“怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理”這一問題有一定的了解,快去動(dòng)手實(shí)踐吧,如果想了解更多相關(guān)知識點(diǎn),可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站!小編會(huì)繼續(xù)為大家?guī)砀玫奈恼拢?/p>

當(dāng)前題目:怎么使用ApachePulsarFunctions進(jìn)行簡單事件處理
網(wǎng)頁路徑:http://muchs.cn/article12/pdpggc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、外貿(mào)網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)公司、微信公眾號搜索引擎優(yōu)化、關(guān)鍵詞優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)