PulsarIO

這篇文章將為大家詳細講解有關Pulsar IO,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

在濱海新區(qū)等地區(qū),都構建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務理念,為客戶提供成都網(wǎng)站制作、成都網(wǎng)站建設 網(wǎng)站設計制作按需制作網(wǎng)站,公司網(wǎng)站建設,企業(yè)網(wǎng)站建設,成都品牌網(wǎng)站建設,營銷型網(wǎng)站,成都外貿(mào)網(wǎng)站建設公司,濱海新區(qū)網(wǎng)站建設費用合理。

Apache Pulsar 是業(yè)界領先的消息系統(tǒng)。使用消息系統(tǒng)時,一個較為常見的問題就是:將數(shù)據(jù)移入或移出消息平臺的最佳方法是什么?

當然,用戶可以使用 Pulsar 的 consumer 和 producer API 編寫自定義代碼,來傳輸數(shù)據(jù)。但除此之外,是否還有其他方法呢?

以下為用戶提出的一些相關問題:

1. 要將數(shù)據(jù)發(fā)布到 Pulsar 或使用 Pulsar 中的數(shù)據(jù),我應該在哪里運行相應程序?

2. 要將數(shù)據(jù)發(fā)布到 Pulsar 或使用 Pulsar 中的數(shù)據(jù),我應該怎樣運行相應程序? 

用戶之所以會提出這些問題,是因為其他消息/發(fā)布-訂閱系統(tǒng)沒有提供有組織且容錯的方式來幫助用戶從外部系統(tǒng)輸入數(shù)據(jù)或?qū)?shù)據(jù)輸出到外部系統(tǒng),因而用戶需要尋求自定義解決方案并手動運行。

為了解決上述問題并簡化這一過程,我們推出了 Pulsar IO。

Pulsar IO 通過利用現(xiàn)有的 Pulsar Functions 框架來輸入/輸出數(shù)據(jù)。而 Pulsar Functions 框架的所有優(yōu)勢(如:容錯性、并行性、彈性、負載平衡、按需更新等)都可以直接被 Pulsar 輸入/輸出數(shù)據(jù)的應用程序所利用。

而且,我們發(fā)現(xiàn)經(jīng)常會出現(xiàn)這樣的情況,用戶花很大功夫(因為他們不是消息系統(tǒng)方面的專家,可能也不想成為這一領域的專家)去編寫自定義程序,用于從消息傳遞系統(tǒng)訪問數(shù)據(jù)。

自定義編寫這些應用程序不僅會很困難,而且我們發(fā)現(xiàn),許多用戶在嘗試實現(xiàn)執(zhí)行相同功能的應用程序時,做了相同的工作。歸根結底,消息系統(tǒng)只是用于移動數(shù)據(jù)的工具,因此,在設計 Pulsar IO 框架時,我們的主要目標之一就是易用性。

我們希望用戶能夠在不編寫任何代碼,也不用同時成為 Pulsar 和外部系統(tǒng)專家的情況下,可以從外部系統(tǒng)輸入數(shù)據(jù)或?qū)?shù)據(jù)輸出到外部系統(tǒng)。

Pulsar IO 框架是什么樣的?

首先,我們定義兩個應用程序,一個作為 source 將數(shù)據(jù)輸入到 Pulsar ,另一個作為 sink 從 Pulsar 接收數(shù)據(jù)。

Pulsar IO

Source 將數(shù)據(jù)從外部系統(tǒng)導入 Pulsar,而 sink 將數(shù)據(jù)從 Pulsar 導出到外部系統(tǒng)。具體來看,source 從外部系統(tǒng)讀取數(shù)據(jù),并將數(shù)據(jù)寫入 Pulsar topic,而 sink 從一個或多個 Pulsar topic 讀取數(shù)據(jù),并將數(shù)據(jù)寫入外部系統(tǒng)。

Pulsar IO 框架在現(xiàn)有的 Pulsar functions 框架上運行。單個 source 和 sink 可以像 function 一樣與 Pulsar broker 一起運行,如下圖所示。

Pulsar IO

因此,Pulsar Functions 框架的所有優(yōu)勢都適用于 Pulsar IO 框架,即 sink 和 source 應用程序。

正如前面提到的,我們的設計目標包括用戶無需編寫任何自定義應用程序,也無需編寫任何代碼就可以將數(shù)據(jù)移入或移出 Pulsar。

因此,Pulsar IO 框架中有多種內(nèi)置 source 和 sink(Kafka、Twitter Firehose、Cassandra、Aerospike 等,還會支持更多),用戶只需使用一個命令便可運行。用戶因此可以關注于業(yè)務邏輯,而無需擔心實現(xiàn)細節(jié)。



如何使用 Pulsar IO


使用 Pulsar IO 框架很容易。用戶可以在命令行界面使用一行簡單的命令啟動內(nèi)置 source 或 sink。例如,用戶可以用下面的命令來提交 source 到已有的 Pulsar 集群,命令格式如下:

$ ./bin/pulsar-admin source create \      --tenant <tenant> \      --namespace <namespace> \      --name <source-name> \      --destinationTopicName <input-topics> \      --source-type <source-type>

以下示例為運行 twitter firehose source 的命令,用于將 Twitter 中的數(shù)據(jù)導入 Pulsar:

$ ./bin/pulsar-admin source create \--tenant test \  --namespace ns1 \  --name twitter-source \  --destinationTopicName twitter_data \  --sourceConfigFile examples/twitter.yml \  --source-type twitter

經(jīng)過以上步驟,用戶即可向 Pulsar 輸入數(shù)據(jù),而無需編寫或編譯任何代碼。唯一可能需要的是一個配置文件,用于為該 source 或 sink 指定某些配置。用戶可以通過以下格式的命令向現(xiàn)有的 Pulsar 集群中提交待運行的內(nèi)置 sink:

$ ./bin/pulsar-admin sink create \     --tenant <tenant> \     --namespace <namespace> \     --name <sink-name> \     --inputs <input-topics> \     --sink-type <sink-type>

以下為運行 Cassandra sink 的示例命令,用于將數(shù)據(jù)從 Pulsar 導出到 Cassandra:

$ ./bin/pulsar-admin sink create \     --tenant public \     --namespace default \     --name cassandra-test-sink \     --sink-type cassandra \     --sinkConfigFile examples/cassandra-sink.yml \     --inputs test_cassandra

更多關于如何運行 Cassandra source 的信息,參閱快速入門指南:
https://pulsar.apache.org/docs/en/2.1.1-incubating/io-quickstart/

以上命令顯示了如何在“集群”模式下(即作為現(xiàn)有 Pulsar 集群的一部分)運行 source 和 sink。除此之外,還可以在“本地運行”模式下將 source 和 sink 作為獨立進程運行,這一模式會在機器上生成本地進程并且運行 source 或者 sink 的邏輯。

本地運行模式有助于測試和調(diào)試,但是,需要用戶自行監(jiān)控和監(jiān)督。以下為在本地運行模式下運行 source 的命令示例:

$ ./bin/pulsar-admin sink localrun \  --tenant public \     --namespace default \     --name cassandra-test-sink \     --sink-type cassandra \     --sinkConfigFile examples/cassandra-sink.yml \     --inputs test_cassandra

由于 Pulsar IO 框架在 Pulsar Functions 上運行,因此可以通過更新參數(shù)和配置來動態(tài)更新 source 或 sink。例如,當希望利用前面提到的 Twitter firehose source 將數(shù)據(jù)輸入到另一個 Pulsar topic 時,可以執(zhí)行以下命令:

$ ./bin/pulsar-admin source update \--tenant test \  --namespace ns1 \  --name twitter-source \  --destinationTopicName twitter_data_2 \  --sourceConfigFile examples/twitter.yml \  --source-type twitter

?

也可以使用同樣格式的命令更新 sink。大多數(shù) source 和 sink 的更新都可以在運行時進行配置,從而簡化修改、測試、部署等流程。

如果要自定義實現(xiàn)一個小眾的用例,則可以通過實現(xiàn)一個簡單的界面來創(chuàng)建 source 或 sink。但是,Pulsar IO 的目的是幫助用戶直接使用現(xiàn)有的內(nèi)置 source 或 sink,而不必自己手動實現(xiàn) source 或 sink。

???? 實現(xiàn)自定義 source

要創(chuàng)建自定義 source,用戶需要編寫一個實現(xiàn) source 接口的 Java 類:

public interface Source<T> extends AutoCloseable {/** * Open source with configuration * * @param config initialization config * @throws Exception IO type exceptions when opening a connector */    void open(final Map<String, Object> config) throws Exception;    /**     * Reads the next message from source.     * If source does not have any new messages, this call should block.     * @return next message from source.  The return result should never be null     * @throws Exception    */    Record<T> read() throws Exception;}

這是一個 source 實現(xiàn)的簡單示例:

public class TestSource implements Source<Integer> {    private int i = 0;    @Override    public void open(Map<String, Object> config) throws Exception {    }    @Override    public Record<Integer> read() throws Exception {       return () -> i++;    }    @Override    public void close() throws Exception {    }}

在上面的 source 示例中,單調(diào)遞增的整數(shù)被傳入到 Pulsar。

實現(xiàn) “Record” 接口的對象需要通過 “read” 方法返回,因為 “Record” 接口包含可用于實現(xiàn)不同消息傳遞語義或保證的字段,例如 exactly-once/effectively-once。在后續(xù)文章中,我將詳細討論如何執(zhí)行此操作。

???? 實現(xiàn)自定義 sink

要創(chuàng)建自定義 sink,用戶需要編寫一個實現(xiàn) sink 接口的 Java 類:

public interface Sink<T> extends AutoCloseable{    /**    * Open Sink with configuration    *    * @param config initialization config    * @throws Exception IO type exceptions when opening a connector    */   void open(final Map<String, Object> config) throws Exception;   /**    * Write a message to Sink    * @param inputRecordContext Context of value    * @param value value to write to sink    * @throws Exception    */   void write(RecordContext inputRecordContext, T value) throws Exception;}

例如,一個簡單的 sink 實現(xiàn):

public class TestSink implements Sink<String> {private static final String FILENAME = "/tmp/test-out";private BufferedWriter bw = null;private FileWriter fw = null;@Overridepublic void open(Map<String, Object> config) throws Exception {        File file = new File(FILENAME);// if file doesnt exists, then create itif (!file.exists()) {           file.createNewFile();        }        fw = new FileWriter(file.getAbsoluteFile(), true);        bw = new BufferedWriter(fw);    }@Overridepublic void write(RecordContext inputRecordContext, String value) throws Exception {try {            bw.write(value);            bw.flush();        } catch (IOException e) {throw new RuntimeException(e);        }    }@Overridepublic void close() throws Exception {try {if (bw != null)                bw.close();if (fw != null)                fw.close();        } catch (IOException ex) {            ex.printStackTrace();        }    }}

以上示例說明 sink 如何從 Pulsar 讀取數(shù)據(jù)并寫入文件。與 source 接口類似,sink 接口中的 “write” 方法有一個 RecordContext 參數(shù)。此參數(shù)為 sink 提供需要寫入外部系統(tǒng)的值的 context。

RecordContext 參數(shù)可用于實現(xiàn)能夠提供不同級別的消息傳遞語義或保證(如:Exactly-once/Effective-once)的 sink。在后續(xù)文章中,我們將對此進行更深入的討論。

用戶可以通過類似于運行內(nèi)置 source 和 sink 的方式來提交自定義 source 和 sink:

$ ./bin/pulsar-admin source create \  --className  <classname> \  --jar <jar-location> \  --tenant <tenant> \  --namespace <namespace> \  --name <source-name> \  --destinationTopicName <output-topic>

命令示例如下:

$ ./bin/pulsar-admin source create \  --className org.apache.pulsar.io.twitter.TwitterFireHose \  --jar \~/application.jar \  --tenant test \  --namespace ns1 \  --name twitter-source \  --destinationTopicName twitter_data

在現(xiàn)有 Pulsar 集群中提交待運行的自定義 sink 的命令格式如下:

$ ./bin/pulsar-admin sink create \--className  <classname> \--jar <jar-location> \--tenant test \--namespace <namespace> \--name <sink-name> \--inputs <input-topics>

命令示例:

 $ ./bin/pulsar-admin sink create \--className  org.apache.pulsar.io.cassandra \--jar \~/application.jar \--tenant test \--namespace ns1 \--name cassandra-sink \--inputs test_topic```



使用 Pulsar IO 框架的優(yōu)勢


如上所述,Pulsar IO 框架在現(xiàn)有的 Pulsar Functions 框架上運行。Pulsar IO 充分利用了現(xiàn)有的 Pulsar Functions 框架。作為 Pulsar IO 的組成部分,source 和 sink 擁有 Pulsar Functions 的所有優(yōu)勢:

Pulsar IO

關于Pulsar IO就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

分享標題:PulsarIO
URL網(wǎng)址:http://muchs.cn/article8/ihicip.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供搜索引擎優(yōu)化、外貿(mào)建站、手機網(wǎng)站建設、移動網(wǎng)站建設、Google、軟件開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站優(yōu)化排名