如何實(shí)現(xiàn)TDMQ中的Pulsar廣播

如何實(shí)現(xiàn)TDMQ中的Pulsar 廣播,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。

10年積累的網(wǎng)站建設(shè)、成都做網(wǎng)站經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問(wèn)題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站制作后付款的網(wǎng)站建設(shè)流程,更有汪清免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。

Pulsar 作為 Apache 社區(qū)的相對(duì)新的成員,在業(yè)界受到非常大量的關(guān)注。新產(chǎn)品的文檔相對(duì)不齊全也是非常能夠理解的。今天客戶問(wèn)過(guò)來(lái)廣播怎么實(shí)現(xiàn)的,我解釋了半天,又找了很多介紹產(chǎn)品的 PPT,最終也沒(méi)有找到“官方”的文檔說(shuō)明這個(gè)事情。于是我就寫了這篇文章,方便大家 copy/paste 。

Pulsar訂閱模型分類

Pulsar 支持的幾種模式如下,依次是 獨(dú)占模式 / 高可用模式 / 分享模式 / 基于鍵值 的分享模式。

如何實(shí)現(xiàn)TDMQ中的Pulsar 廣播  

 

Pulsar 廣播模式

Pulsar 的訂閱模式和很多 MQ 不太一樣。比如 RabbitMQ/Kafka 等,一般消費(fèi)端(Consumer)是直接去對(duì)接 Topic 的,然后 Consumer 自己又有個(gè)組的概念在配置中心去設(shè)置 offset,以此來(lái)決定是一起分享 Topic 的數(shù)據(jù),還是每個(gè)人都接收同樣的數(shù)據(jù)。在 Pulsar 的消費(fèi)訂閱模型里,添加了一個(gè) Subscription 的邏輯,Subscription 的 Type 決定了消費(fèi)是獨(dú)享還是分享。

于是廣播模式可以用不同 Subscription 獨(dú)享的模式來(lái)實(shí)現(xiàn),具體架構(gòu)可以參照下圖:

如何實(shí)現(xiàn)TDMQ中的Pulsar 廣播  

 

代碼實(shí)現(xiàn)

1. Full-mesh 的形創(chuàng)建 Java 項(xiàng)目(比如:Springboot - 這個(gè)應(yīng)該是相對(duì)簡(jiǎn)單的 IDE 集成開發(fā)組件)

畫重點(diǎn)

  • pulsar-client-api 和 tdmq-client 需要2.6.0
  • tdmq-client 需要在騰訊的repo里才能拿到,需要使用介紹鏈接介紹的方式進(jìn)行maven的配置(gradle方法類似)
  • 介紹鏈接:https://cloud.tencent.com/document/product/1179/44914

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.4.3</version>    <relativePath /> <!-- lookup parent from repository -->  </parent>  <groupId>com.examble.demo</groupId>  <artifactId>tdmq-demo</artifactId>  <version>0.0.1-SNAPSHOT</version>  <name>tdmq-demo</name>  <description>demo project to test tdmq</description>  <properties>    <java.version>1.8</java.version>  </properties>  <dependencies>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <dependency>      <groupId>com.tencent.tdmq</groupId>      <artifactId>tdmq-client</artifactId>      <version>2.6.0</version>    </dependency>    <!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-client-api -->    <dependency>      <groupId>org.apache.pulsar</groupId>      <artifactId>pulsar-client-api</artifactId>      <version>2.6.0</version>    </dependency>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-test</artifactId>      <scope>test</scope>    </dependency>  </dependencies>
 <build>    <plugins>      <plugin>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-maven-plugin</artifactId>      </plugin>    </plugins>  </build>
</project>
 

2. 創(chuàng)建一個(gè) Component 用來(lái)全局使用 Producer 和 Consumers

這里創(chuàng)建了1個(gè) Producer 和3個(gè)擁有 exclusive subscription 的 consumers(廣播模式 - 我們期待他們3個(gè)每次都收到一樣的信息)

package com.example.demo.tdmq.instance;
import javax.annotation.PostConstruct;
import org.apache.pulsar.client.api.AuthenticationFactory;import org.apache.pulsar.client.api.Consumer;import org.apache.pulsar.client.api.Message;import org.apache.pulsar.client.api.MessageListener;import org.apache.pulsar.client.api.Producer;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;import org.apache.pulsar.client.api.SubscriptionType;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Scope;import org.springframework.stereotype.Component;
@Component@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)public class Global {  PulsarClient client;  public Producer<byte[]> producer;  public Consumer<byte[]> consumer01;  public Consumer<byte[]> consumer02;  public Consumer<byte[]> consumer03;
 public Global() {
 }
 @PostConstruct  public void init() {    try {      client = PulsarClient.builder().serviceUrl("pulsar://<Your TDMQ Pulsar Service URL>:6000/")          .listenerName("custom:<TDMQ Pulsar Instance ID>/<TDMQ VPC ID>/<TDMQ Subnet ID>")          .authentication(AuthenticationFactory.token(              "<Your Credential Token from TDMQ>"))          .build();      producer = client.newProducer().topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>").create();      consumer01 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)          .topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>")          .messageListener(new MessageListener<byte[]>() {
           /**             *             */            private static final long serialVersionUID = 1L;
           @Override            public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {              System.out.println("Consumer01" + " - " + System.currentTimeMillis() + " - "                  + new String(msg.getData()));              try {                consumer.acknowledge(msg);              } catch (PulsarClientException e) {                // TODO Auto-generated catch block                e.printStackTrace();              }
           }          }).subscriptionName("my-subscription01").subscribe();      consumer02 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)          .topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>")          .messageListener(new MessageListener<byte[]>() {
           /**             *             */            private static final long serialVersionUID = 1L;
           @Override            public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {              System.out.println("Consumer02" + " - " + System.currentTimeMillis() + " - "                  + new String(msg.getData()));              try {                consumer.acknowledge(msg);              } catch (PulsarClientException e) {                // TODO Auto-generated catch block                e.printStackTrace();              }
           }          }).subscriptionName("my-subscription02").subscribe();      consumer03 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)          .topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>")          .messageListener(new MessageListener<byte[]>() {
           /**             *             */            private static final long serialVersionUID = 1L;
           @Override            public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {              System.out.println("Consumer03" + " - " + System.currentTimeMillis() + " - "                  + new String(msg.getData()));              try {                consumer.acknowledge(msg);              } catch (PulsarClientException e) {                // TODO Auto-generated catch block                e.printStackTrace();              }
           }          }).subscriptionName("my-subscription03").subscribe();
   } catch (PulsarClientException e) {      // TODO Auto-generated catch block      e.printStackTrace();    }  }
}
 

3. 最外層的測(cè)試代碼和簡(jiǎn)單的 Message 模型

public class MessageModel {
 private String messageText = null;
 public String getMessageText() {    return messageText;  }
 public void setMessageText(String messageText) {    this.messageText = messageText;  }}
 

跑起來(lái)測(cè)試一下,果然3個(gè)一起接收一樣的消息

如何實(shí)現(xiàn)TDMQ中的Pulsar 廣播    

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。

新聞標(biāo)題:如何實(shí)現(xiàn)TDMQ中的Pulsar廣播
URL鏈接:http://muchs.cn/article26/pdgejg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站導(dǎo)航、面包屑導(dǎo)航、企業(yè)網(wǎng)站制作、品牌網(wǎng)站設(shè)計(jì)、、手機(jī)網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都做網(wǎng)站