KafkaConsumer開發(fā)-創(chuàng)新互聯(lián)

Kafka Consumer - 消費者

跟生產(chǎn)者一樣,消費者也屬于kafka的客戶端,不過kafka消費者是從kafka讀取數(shù)據(jù)的應(yīng)用,側(cè)重于讀數(shù)據(jù)。一個或多個消費者訂閱kafka集群中的topic,并從broker接收topic消息,從而進行業(yè)務(wù)處理。今天來學(xué)習(xí)下kafka consumer基本使用。

創(chuàng)新互聯(lián)建站-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比鎮(zhèn)海網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式鎮(zhèn)海網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋鎮(zhèn)海地區(qū)。費用合理售后完善,十載實體公司更值得信賴。消費者example 組件版本
  • kafka_2.13-3.3.1
  • JDK17
  • apache-maven-3.6.0
Maven依賴
org.apache.kafkakafka-clients3.3.1
消費者代碼
public static void main(String[] args){String topicName = "consumer-topic";

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_01");
        props.put("enable.auto.commit", true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumerconsumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));
        try {while (true) {ConsumerRecordsrecords = consumer.poll(Duration.ofSeconds(1));
                records.forEach(record ->{System.out.println("Message received " + record.value());
                });
            }
        }finally {consumer.close();
        }
    }
測試驗證
  • 創(chuàng)建topic

    ./bin/kafka-topics.sh --create --topic consumer-topic --bootstrap-server localhost:9092
  • 啟動生產(chǎn)者 - 這里使用kafka自帶的生產(chǎn)者腳本進行測試

    ./bin/kafka-console-producer.sh --topic consumer-topic --bootstrap-server localhost:9092
  • 測試結(jié)果

    在這里插入圖片描述

至此 一個簡單的kafka消費者程序已經(jīng)開發(fā)完成,代碼不多,開發(fā)起來也快。但是關(guān)于kafka 消費者內(nèi)部有很多的原理、細(xì)節(jié)需要去梳理,否則出現(xiàn)問題就會茫然失措,不知所以。

pull VS poll

上面的消費者程序有一個很核心的細(xì)節(jié)需要關(guān)注,即kafka 消費者以什么的方式對數(shù)據(jù)進行消費。對比其他傳統(tǒng)的消息中間件,消息消費的方式主要有兩種:

  • 推送模式 - broker 主動推送消息給消費者
  • 拉取模式- 消費者主動從broker拉取消息

kafka在設(shè)計之處,就考慮這個問題:消費者從broker拉取數(shù)據(jù),還是broker主動推送數(shù)據(jù)給消費者。在這方面kafka采用更為傳統(tǒng)的設(shè)計:消費者主動拉取,其優(yōu)勢如下:

  • 拉取模式 可以根據(jù)消費者自身的消費能力對數(shù)據(jù)處理。如生產(chǎn)者大量數(shù)據(jù),消費者消費能力有限
  • 拉取模式 消費者可以根據(jù)實際情況對數(shù)據(jù)進行批量處理。推送模式很難做到這一點
  • broker被設(shè)計成無狀態(tài)模式,broker不需要對記錄每一個消費者的偏移量,由客戶端自己控制 便于kafka集群擴展
消息傳遞語義

在介紹消息傳遞語義之前,首先要了解下kafka 消費者位置(也叫做偏移量)管理。

位移管理

kafka 消費者端需要為每個讀取的topic 分區(qū)保存消費進度,即當(dāng)前分區(qū)中消費者消費消息的最新位置。該位置也叫做偏移量- offset。消費者需要定期地想kafka提交自己的位置信息,實際上,偏移量通常是下一條待消費消息的位置。如下圖

在這里插入圖片描述

從kafka broker讀取消息,開發(fā)者可以選擇提交偏移量的時間,消費者默認(rèn)自動提交偏移量,這可能會帶來一些風(fēng)險。

最多一次

在這種情況下,在調(diào)用poll()后,一旦收到消息批,就立即提交偏移量。如果后續(xù)處理失?。ㄈ鐦I(yè)務(wù)處理過程中發(fā)生異常,數(shù)據(jù)只是被從Broker讀取出來,并沒有真正的處理),消息將丟失。它不會被再次讀取,因為這些消息的偏移量已經(jīng)提交。

在這里插入圖片描述

  1. 批量拉取數(shù)據(jù)
  2. 消費者自動提交偏移量
  3. 對消息進行業(yè)務(wù)處理,如發(fā)送email,此時系統(tǒng)奔潰
  4. 系統(tǒng)重啟后,從上次已提交的偏移量進行讀取,在業(yè)務(wù)上造成消息丟失
至少一次

在至少一次語義定義中,broker消息的每一個消息都會被傳遞到消費者,但是可能會存在重復(fù)拉取的場景,從而導(dǎo)致消息被重復(fù)處理。跟最多一次提交位置偏移量的時機不同,至少一次在處理消息后提交偏移量。

因此需要確保消息處理的冪等性,如對數(shù)據(jù)進行插入、更新操作;防止重復(fù)消費導(dǎo)致數(shù)據(jù)出現(xiàn)錯亂。

至少一次消息處理的流程大致如下:

  1. 批量拉取數(shù)據(jù)

  2. 此時消費者并不提交偏移量

  3. 對消息進行業(yè)務(wù)處理

    3.1 處理完成 提交偏移量 進行下一次拉取數(shù)據(jù)

    3.2 消息處理失?。ù藭r可能有一部分?jǐn)?shù)據(jù)處理完成,還有一部分?jǐn)?shù)據(jù)尚未處理)

  4. 重啟應(yīng)用 拉取數(shù)據(jù),又會拉取之前的數(shù)據(jù) 導(dǎo)致消息被重復(fù)處理

精確一次

有些場景不僅需要至少一次語義(保證數(shù)據(jù)不丟失),還需要精確一次語義。每條消息只投遞一次,這需要消費者應(yīng)用程序跟kafka相互配合、相互合作就可以實現(xiàn)精確一次語義

  • 使用kafka事務(wù)API實現(xiàn)精確一次語義
  • 對于消費者應(yīng)用程序,要有效地實現(xiàn)一次,必須使用冪等性消費
位移配置
props.put("enable.auto.commit", true);

enable.auto.commit 參數(shù)默認(rèn)值為true,kafka默認(rèn)在后臺線程中周期性的提交消費者偏移量

auto.commit.interval.ms默認(rèn)為5秒,如果enable.auto.commit參數(shù)設(shè)置為true,即消費者5秒提交一次位移。

在至少一次、精確一次語義中 需要將該參數(shù)設(shè)置為false,由應(yīng)用程序手動提交偏移量

//...
props.put("enable.auto.commit", false);
//...
while (true) {ConsumerRecordsrecords = consumer.poll(Duration.ofSeconds(1));
  records.forEach(record ->{System.out.println("Message received " + record.value());
  });
  //提交偏移量
  consumer.commitSync();
}

根據(jù)不用的應(yīng)用場景,kakfa提供了多個API讓開發(fā)者對消費者位移進行手動管理

在這里插入圖片描述

auto.offset.reset

指定消費者從broker拉取數(shù)據(jù)的位置,有以下幾個選項可以配置

  • earliest - 從最開始進行消費
  • latest - 從最后消費的偏移量進行消費 默認(rèn)值
  • none - 如果未找到使用者組的先前偏移量,則向使用者拋出異常
消費者組
props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_01");

在開發(fā)kafka消費者代碼時,必須指定消費者組,否則會報錯,那么該參數(shù)有什么作用呢。在回答這個問題之前,先假設(shè)兩個應(yīng)用場景

  • kafka中消息特別多,需要增加消費者加快消息處理的速度,避免出現(xiàn)消息堆積
  • 某一類消息特別重要,需要被多個應(yīng)用程序同時消費 - 如購買商品的消息,需要被庫存應(yīng)用、積分應(yīng)用同時消費

借用RocketMQ中的概念(個人覺得比較合適),以上兩種應(yīng)用場景叫做集群消費、廣播消費

  • 集群消費 - 多個消費者共同消費某一個主題內(nèi)的消息
  • 廣播消費 - 每一個消息被多個消費者同時消費

kafka 內(nèi)部以消費者組的方式實現(xiàn)以上兩點要求

  • 同一個消費組的不同消費實例 共同消費topiic的消息
  • 同一個消息被不同的消費組同時消費

在開發(fā)代碼時,只需要按需更改一下配置即可

props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_B");
props.put("client.id", "client_02");

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

分享標(biāo)題:KafkaConsumer開發(fā)-創(chuàng)新互聯(lián)
地址分享:http://muchs.cn/article42/pshec.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號ChatGPT、品牌網(wǎng)站制作、商城網(wǎng)站、網(wǎng)站營銷小程序開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站托管運營