PulsarFunction例子-創(chuàng)新互聯(lián)

在單機環(huán)境下實現(xiàn)字符串追加函數(shù)(Pulsar 2.4.2版本)

創(chuàng)新互聯(lián)公司主營云南網(wǎng)站建設的網(wǎng)絡公司,主營網(wǎng)站建設方案,成都app開發(fā),云南h5微信小程序開發(fā)搭建,云南網(wǎng)站營銷推廣歡迎云南等地區(qū)企業(yè)咨詢

1 啟動單機Pulsar

$ bin/pulsar-daemon start standalone

2 創(chuàng)建函數(shù)

1) 準備環(huán)境

項目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'

2) 創(chuàng)建JAVA函數(shù)(此函數(shù)用于數(shù)據(jù)源來的topic schema是string,輸出的tiopic schema是string)

Pulsar Function 例子

導出jar包,放到pulsar服務器目錄下,本例子放在 /data/jar/下

3)使用命令行工具加載函數(shù)到Pulsar,? ? ? ? ? ? ? ? ? ? ?

bin/pulsar-admin functions create \

--classname test.AppStrFunction \

--jar /data/jar/pf.jar \

--inputs persistent://public/default/tlstest \

--output persistent://public/default/teststr \

--tenant public \

--namespace default \

--name appStrFunction

參數(shù)說明:

參數(shù)
說明
functions通知 pulsar broker,函數(shù)操作
create創(chuàng)建函數(shù),默認創(chuàng)建成功后啟動
classname函數(shù)類名稱,需要加上包名
jar指定 jar 包的運行路徑
inputs指定 函數(shù) 數(shù)據(jù)的來源在哪里,支持多個 topics 作為輸入
output如果該 函數(shù) 有輸出(有些情況下,function 沒有輸出),指定 function 輸出的 topic,只能有一個輸出
tenant指定該 函數(shù) 運行的租戶名
namespace指定該 函數(shù) 運行的命名空間
name指定該 函數(shù) 運行的名稱
以下是函數(shù)相關其他操作

停止函數(shù)

bin/pulsar-admin functions stop \

--tenant public \

--namespace default \

--name appStrFunction

啟動函數(shù)

bin/pulsar-admin functions start \

--tenant public \

--namespace default \

--name appStrFunction

刪除函數(shù)

bin/pulsar-admin functions delete \

--tenant public \

--namespace default \

--name appStrFunction

函數(shù)的日志在 pulsar安裝目錄 /logs/functions下

3 測試函數(shù)

根據(jù)前邊函數(shù)已成功加載啟動

1)向tlstest主題發(fā)送消息? ?

import?java.util.concurrent.TimeUnit; import?org.apache.pulsar.client.api.Producer; import?org.apache.pulsar.client.api.PulsarClient; import?org.apache.pulsar.client.api.Schema; public?class?SendMsgTest{ ??public?static?void?main(String[]?args){ ??????String?url="pulsar://192.168.1.48:6650"; ??try{ ?????PulsarClient?client?=PulsarClient.builder() ???????????.serviceUrl(url) ???????????.connectionTimeout(10,TimeUnit.SECONDS) ???????????.build(); ?????Producer<String>?producer=client.newProducer(Schema.STRING) ???????????.topic("tlstest") ???????????.sendTimeout(10,TimeUnit.SECONDS) ???????????.producerName("senduser") ???????????.create(); ???????????producer.send("this?is?a?book"); ???????????System.out.print("send?ok"); ???????????client.close(); ??????}catch(Exception?e){ ????????e.printStackTrace(); ??????} ??} }

2)讀取teststr主題消息

import?org.apache.pulsar.client.api.Consumer; import?org.apache.pulsar.client.api.Message; import?org.apache.pulsar.client.api.PulsarClient; import?org.apache.pulsar.client.api.Schema; import?org.apache.pulsar.client.api.SubscriptionInitialPosition; import?org.apache.pulsar.client.api.SubscriptionType; import?org.apache.pulsar.client.impl.schema.JSONSchema; import?schema.OrderModel; import?com.alibaba.fastjson.JSON; public?class?RecFunTest?{ public?static?void?main(String[]?args)?{ String?url?=?"http://192.168.1.48:8080"; try{ ??PulsarClient?client?=PulsarClient.builder() ????.serviceUrl(url) ????.build(); ?Consumer<String>?consumer=client.newConsumer(Schema.STRING) ????.topic("teststr") ????.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) ????.subscriptionType(SubscriptionType.Exclusive)//訂閱模式??Exclusive(獨占,默認模式)?Failover(災備)Shared(共享) ????.subscriptionName("wbq")//訂閱者名稱 ????.subscribe(); ?while?(true)?{ ???Message<String>?mondmsg?=?consumer.receive(); ???String?msg=mondmsg.getValue(); ????????????????System.out.println("receive?message=:"+msg); ?????????????} ??}catch(Exception?e){ ?????e.printStackTrace(); ??} ?} }

另外有需要云服務器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。

網(wǎng)站欄目:PulsarFunction例子-創(chuàng)新互聯(lián)
本文來源:http://muchs.cn/article16/cddggg.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供手機網(wǎng)站建設、做網(wǎng)站網(wǎng)站營銷、服務器托管、品牌網(wǎng)站設計、關鍵詞優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站托管運營