在單機環(huán)境下實現(xiàn)字符串追加函數(shù)(Pulsar 2.4.2版本)
創(chuàng)新互聯(lián)公司主營云南網(wǎng)站建設的網(wǎng)絡公司,主營網(wǎng)站建設方案,成都app開發(fā),云南h5微信小程序開發(fā)搭建,云南網(wǎng)站營銷推廣歡迎云南等地區(qū)企業(yè)咨詢1 啟動單機Pulsar
$ bin/pulsar-daemon start standalone
2 創(chuàng)建函數(shù)
1) 準備環(huán)境
項目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'
2) 創(chuàng)建JAVA函數(shù)(此函數(shù)用于數(shù)據(jù)源來的topic schema是string,輸出的tiopic schema是string)
導出jar包,放到pulsar服務器目錄下,本例子放在 /data/jar/下
3)使用命令行工具加載函數(shù)到Pulsar,? ? ? ? ? ? ? ? ? ? ?
bin/pulsar-admin functions create \
--classname test.AppStrFunction \
--jar /data/jar/pf.jar \
--inputs persistent://public/default/tlstest \
--output persistent://public/default/teststr \
--tenant public \
--namespace default \
--name appStrFunction
參數(shù)說明:
參數(shù) | 說明 |
functions | 通知 pulsar broker,函數(shù)操作 |
create | 創(chuàng)建函數(shù),默認創(chuàng)建成功后啟動 |
classname | 函數(shù)類名稱,需要加上包名 |
jar | 指定 jar 包的運行路徑 |
inputs | 指定 函數(shù) 數(shù)據(jù)的來源在哪里,支持多個 topics 作為輸入 |
output | 如果該 函數(shù) 有輸出(有些情況下,function 沒有輸出),指定 function 輸出的 topic,只能有一個輸出 |
tenant | 指定該 函數(shù) 運行的租戶名 |
namespace | 指定該 函數(shù) 運行的命名空間 |
name | 指定該 函數(shù) 運行的名稱 |
停止函數(shù)
bin/pulsar-admin functions stop \
--tenant public \
--namespace default \
--name appStrFunction
啟動函數(shù)
bin/pulsar-admin functions start \
--tenant public \
--namespace default \
--name appStrFunction
刪除函數(shù)
bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name appStrFunction
函數(shù)的日志在 pulsar安裝目錄 /logs/functions下
3 測試函數(shù)
根據(jù)前邊函數(shù)已成功加載啟動
1)向tlstest主題發(fā)送消息? ?
import?java.util.concurrent.TimeUnit; import?org.apache.pulsar.client.api.Producer; import?org.apache.pulsar.client.api.PulsarClient; import?org.apache.pulsar.client.api.Schema; public?class?SendMsgTest{ ??public?static?void?main(String[]?args){ ??????String?url="pulsar://192.168.1.48:6650"; ??try{ ?????PulsarClient?client?=PulsarClient.builder() ???????????.serviceUrl(url) ???????????.connectionTimeout(10,TimeUnit.SECONDS) ???????????.build(); ?????Producer<String>?producer=client.newProducer(Schema.STRING) ???????????.topic("tlstest") ???????????.sendTimeout(10,TimeUnit.SECONDS) ???????????.producerName("senduser") ???????????.create(); ???????????producer.send("this?is?a?book"); ???????????System.out.print("send?ok"); ???????????client.close(); ??????}catch(Exception?e){ ????????e.printStackTrace(); ??????} ??} }2)讀取teststr主題消息
import?org.apache.pulsar.client.api.Consumer; import?org.apache.pulsar.client.api.Message; import?org.apache.pulsar.client.api.PulsarClient; import?org.apache.pulsar.client.api.Schema; import?org.apache.pulsar.client.api.SubscriptionInitialPosition; import?org.apache.pulsar.client.api.SubscriptionType; import?org.apache.pulsar.client.impl.schema.JSONSchema; import?schema.OrderModel; import?com.alibaba.fastjson.JSON; public?class?RecFunTest?{ public?static?void?main(String[]?args)?{ String?url?=?"http://192.168.1.48:8080"; try{ ??PulsarClient?client?=PulsarClient.builder() ????.serviceUrl(url) ????.build(); ?Consumer<String>?consumer=client.newConsumer(Schema.STRING) ????.topic("teststr") ????.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) ????.subscriptionType(SubscriptionType.Exclusive)//訂閱模式??Exclusive(獨占,默認模式)?Failover(災備)Shared(共享) ????.subscriptionName("wbq")//訂閱者名稱 ????.subscribe(); ?while?(true)?{ ???Message<String>?mondmsg?=?consumer.receive(); ???String?msg=mondmsg.getValue(); ????????????????System.out.println("receive?message=:"+msg); ?????????????} ??}catch(Exception?e){ ?????e.printStackTrace(); ??} ?} }另外有需要云服務器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。
網(wǎng)站欄目:PulsarFunction例子-創(chuàng)新互聯(lián)
本文來源:http://muchs.cn/article16/cddggg.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供手機網(wǎng)站建設、做網(wǎng)站、網(wǎng)站營銷、服務器托管、品牌網(wǎng)站設計、關鍵詞優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容