怎么在Spring Boot中使用KafkaAdminClient集群管理工具?相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。
創(chuàng)新互聯(lián)是少有的網(wǎng)站設(shè)計制作、成都網(wǎng)站設(shè)計、營銷型企業(yè)網(wǎng)站、小程序開發(fā)、手機APP,開發(fā)、制作、設(shè)計、賣友情鏈接、推廣優(yōu)化一站式服務(wù)網(wǎng)絡(luò)公司,自2013年起,堅持透明化,價格低,無套路經(jīng)營理念。讓網(wǎng)頁驚喜每一位訪客多年來深受用戶好評
原理介紹
在Kafka官網(wǎng)中這么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準):
創(chuàng)建Topic:createTopics(Collection<NewTopic> newTopics)
刪除Topic:deleteTopics(Collection<String> topics)
羅列所有Topic:listTopics()
查詢Topic:describeTopics(Collection<String> topicNames)
查詢集群信息:describeCluster()
查詢ACL信息:describeAcls(AclBindingFilter filter)
創(chuàng)建ACL信息:createAcls(Collection<AclBinding> acls)
刪除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
查詢配置信息:describeConfigs(Collection<ConfigResource> resources)
修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
修改副本的日志目錄:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
查詢節(jié)點的日志目錄信息:describeLogDirs(Collection<Integer> brokers)
查詢副本的日志目錄信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
增加分區(qū):createPartitions(Map<String, NewPartitions> newPartitions)
其內(nèi)部原理是使用Kafka自定義的一套二進制協(xié)議來實現(xiàn),詳細可以參見Kafka協(xié)議。主要實現(xiàn)步驟:
客戶端根據(jù)方法的調(diào)用創(chuàng)建相應(yīng)的協(xié)議請求,比如創(chuàng)建Topic的createTopics方法,其內(nèi)部就是發(fā)送CreateTopicRequest請求。
客戶端發(fā)送請求至Kafka Broker。
Kafka Broker處理相應(yīng)的請求并回執(zhí),比如與CreateTopicRequest對應(yīng)的是CreateTopicResponse。
客戶端接收相應(yīng)的回執(zhí)并進行解析處理。
和協(xié)議有關(guān)的請求和回執(zhí)的類基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是這些請求和回執(zhí)類的兩個基本父類。
代碼如下
@Component public class KafkaConfig{ // 配置Kafka public Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); /* props.put("retries", 2); // 重試次數(shù) props.put("batch.size", 16384); // 批量發(fā)送大小 props.put("buffer.memory", 33554432); // 緩存大小,根據(jù)本機內(nèi)存大小配置 props.put("linger.ms", 1000); // 發(fā)送頻率,滿足任務(wù)一個條件發(fā)送*/ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } }
@RestController public class KafkaTopicManager { @Autowired private KafkaConfig kafkaConfig; @GetMapping("createTopic") public void createTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); NewTopic newTopic = new NewTopic("test1",4, (short) 1); Collection<NewTopic> newTopicList = new ArrayList<>(); newTopicList.add(newTopic); adminClient.createTopics(newTopicList); adminClient.close(); } @GetMapping("deleteTopic") public void deleteTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); adminClient.deleteTopics(Arrays.asList("test1")); adminClient.close(); } @GetMapping("listAllTopic") public void listAllTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); ListTopicsResult result = adminClient.listTopics(); KafkaFuture<Set<String>> names = result.names(); try { names.get().forEach((k)->{ System.out.println(k); }); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } adminClient.close(); } @GetMapping("getTopic") public void getTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test")); Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values(); if(values.isEmpty()){ System.out.println("找不到描述信息"); }else{ for (KafkaFuture<TopicDescription> value : values) { System.out.println(value); } } adminClient.close(); } }
看完上述內(nèi)容,你們掌握怎么在Spring Boot中使用KafkaAdminClient集群管理工具的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!
文章名稱:怎么在SpringBoot中使用KafkaAdminClient集群管理工具
文章源于:http://muchs.cn/article8/pihsop.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、動態(tài)網(wǎng)站、虛擬主機、響應(yīng)式網(wǎng)站、外貿(mào)建站、定制網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)