如何解析Kafka消息丟失與消費(fèi)精確一次性

今天就跟大家聊聊有關(guān)如何解析Kafka 消息丟失與消費(fèi)精確一次性,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

專注于為中小企業(yè)提供成都網(wǎng)站建設(shè)、成都網(wǎng)站制作服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)賓川免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了數(shù)千家企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過(guò)網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

消息丟失的場(chǎng)景

如果Kafka Producer使用“發(fā)后即忘”的方式發(fā)送消息,即調(diào)用producer.send(msg)方法來(lái)發(fā)送消息,方法會(huì)立即返回,但此時(shí)并不能說(shuō)明消息已經(jīng)發(fā)送成功。消息發(fā)送方式詳見初次邂逅Kafka生產(chǎn)者。

如果在消息過(guò)程中發(fā)生了網(wǎng)絡(luò)抖動(dòng),那么消息就會(huì)丟失;或發(fā)送的消息本身不符合要求,如大小超過(guò)Broker端的承受能力等(消息太大的情況在生產(chǎn)中實(shí)際遇到過(guò),最后通過(guò)在發(fā)送前將消息分包,再依次發(fā)送,解決了該問(wèn)題)。

解決該問(wèn)題的方法就是:Producer要使用帶回調(diào)通知的方法發(fā)送消息,即producer.send(msg, callback)?;卣{(diào)方法callback可以告訴我們消息是否真的提交成功了,一旦出現(xiàn)消息發(fā)送失敗的情況,可以使用代碼進(jìn)行容錯(cuò)及補(bǔ)救。

例如:網(wǎng)絡(luò)抖動(dòng)導(dǎo)致的消息丟失,可以使Producer重試;消息不合格,則將消息格式進(jìn)行調(diào)整,再發(fā)送。Producer使用帶回調(diào)的消息發(fā)送API,可以及時(shí)發(fā)現(xiàn)消息是否發(fā)送失敗并作相應(yīng)處理。

消費(fèi)者丟失數(shù)據(jù)

Consumer端丟失數(shù)據(jù)主要體現(xiàn)在:拉取了消息,并提交了消費(fèi)位移,但是在消息處理結(jié)束之前突然發(fā)生了宕機(jī)等故障。消費(fèi)者重生后,會(huì)從之前已提交的位移的下一個(gè)位置重新開始消費(fèi),之前未處理完成的消息不會(huì)再次處理,即相當(dāng)于消費(fèi)者丟失了消息。

解決Consumer端丟失消息的方法也很簡(jiǎn)單:將位移提交的時(shí)機(jī)改為消息處理完成后,確認(rèn)消費(fèi)完成了一批消息再提交相應(yīng)的位移。這樣做,即使處理消息的過(guò)程中發(fā)生了異常,由于沒有提交位移,下次消費(fèi)時(shí)還會(huì)從上次的位移處重新拉取消息,不會(huì)發(fā)生消息丟失的情況。

具體的實(shí)現(xiàn)方法為,Consumer在消費(fèi)消息時(shí),關(guān)閉自動(dòng)提交位移,由應(yīng)用程序手動(dòng)提交位移。

Broker端丟失數(shù)據(jù)

Broker端丟失數(shù)據(jù)主要有以下幾種情況:

原來(lái)的Broker宕機(jī)了,卻選舉了一個(gè)落后Leader太多的Broker成為新的Leader,那么落后的這些消息就都丟失了,可以禁止這些“unclean”的Broker競(jìng)選成為L(zhǎng)eader;

Kafka使用頁(yè)緩存機(jī)制,將消息寫入頁(yè)緩存而非直接持久化至磁盤,將刷盤工作交由操作系統(tǒng)來(lái)調(diào)度,以此來(lái)保證高效率和高吞吐量。如果某一部分消息還在內(nèi)存頁(yè)中,未持久化至磁盤,此時(shí)Broker宕機(jī),重啟后則這部分消息丟失,使用多副本機(jī)制可以避免Broker端丟失消息;

避免消息丟失的最佳實(shí)踐

不使用producer.send(msg),而使用帶回調(diào)的producer.send(msg, callback)方法;

設(shè)置acks = all。acks參數(shù)是Producer的一個(gè)參數(shù),代表了對(duì)消息“已提交”的定義。如果設(shè)置成all,則表示所有的Broker副本都要接收到消息,才算消息“已提交”,是最高等級(jí)的“已提交”標(biāo)準(zhǔn);

設(shè)置retries為一個(gè)較大的值,retries表示Producer發(fā)送消息失敗后的重試次數(shù),如果發(fā)生了網(wǎng)絡(luò)抖動(dòng)等瞬時(shí)故障,可以通過(guò)重試機(jī)制重新發(fā)送消息,避免消息丟失;

設(shè)置unclean.leader.election.enable = false。這是一個(gè)Broker端參數(shù),表示哪些Broker有資格競(jìng)選為分區(qū)的Leader。如果一個(gè)落后Leader太多的Follower所在Broker成為了新的Leader,則必然會(huì)導(dǎo)致消息的丟失,故將該參數(shù)設(shè)置為false,即不允許這種情況的發(fā)生;

設(shè)置replication.factor >= 3。Broker端參數(shù),表示每個(gè)分區(qū)的副本數(shù)大于等于3,使用冗余的機(jī)制來(lái)防止消息丟失;

設(shè)置min.insync.replicas > 1。Broker端參數(shù),控制的是消息至少被寫入多少個(gè)副本蔡栓是“已提交”,將該參數(shù)設(shè)置成大于1可以提升消息持久性;

確保replication.factor > min.insync.replicas。若兩者相等,則如果有一個(gè)副本掛了,整個(gè)分區(qū)就無(wú)法正常工作了。推薦設(shè)置為:replication.factor = min.insync.replicas + 1;

確保消息消費(fèi)完再提交位移,將Consumer端參數(shù)enable.auto.commit設(shè)置為fasle,關(guān)閉位移自動(dòng)提交,使用手動(dòng)提交位移的形式。

精確一次消費(fèi)

目前Kafka默認(rèn)提供的消息可靠機(jī)制是“至少一次”,即消息不會(huì)丟失。上一節(jié)中我們知道,Producer如果發(fā)送消息失敗,則可以通過(guò)重試解決,若Broker端的應(yīng)答未成功發(fā)送給Producer(如網(wǎng)絡(luò)抖動(dòng)),Producer此時(shí)也會(huì)進(jìn)行重試,再次發(fā)送原來(lái)的消息。這就是Kafka默認(rèn)提供消息至少一次性的原因,不過(guò)這可能會(huì)導(dǎo)致消息重復(fù)發(fā)送。

如果需要保證消息消費(fèi)的“最多一次”,那么禁止Producer的重試即可。但是寫入失敗的消息如果不重試則會(huì)永遠(yuǎn)丟失。是否有其他方法來(lái)保證消息的發(fā)送既不丟失,也不重復(fù)消費(fèi)?或者說(shuō)即使Producer重復(fù)發(fā)送了某些消息,Broker端也能夠自動(dòng)去重。

Kafka實(shí)際上通過(guò)兩種機(jī)制來(lái)確保消息消費(fèi)的精確一次:

冪等性(Idempotence)

事務(wù)(Transaction)

冪等性

所謂的冪等,簡(jiǎn)單說(shuō)就是對(duì)接口的多次調(diào)用所產(chǎn)生的結(jié)果和調(diào)用一次是一致的。在Kafka中,Producer默認(rèn)不是冪等性的,Kafka于0.11.0.0版本引入該特性。設(shè)置參數(shù)enable.idempotence為true即可指定Producer的冪等性。開啟冪等生產(chǎn)者后,Kafka會(huì)自動(dòng)進(jìn)行消息的去重發(fā)送。為了實(shí)現(xiàn)生產(chǎn)者的冪等性,Kafka引入了producer id(后簡(jiǎn)稱PID)和序列號(hào)(sequence number)兩個(gè)概念。

生產(chǎn)者實(shí)例在被創(chuàng)建的時(shí)候,會(huì)分配一個(gè)PID,這個(gè)PID對(duì)用戶完全透明。對(duì)于每個(gè)PID,消息發(fā)送到的每一個(gè)分區(qū)都有對(duì)應(yīng)的序列號(hào),這些序列號(hào)從0開始單調(diào)遞增。生產(chǎn)者每發(fā)送一條消息就會(huì)將**<PID, 分區(qū)>**對(duì)應(yīng)的序列號(hào)值加1。

Broker端在內(nèi)存中為每一對(duì)<PID, 分區(qū)>維護(hù)一個(gè)序列號(hào)SN_old。針對(duì)生產(chǎn)者發(fā)送來(lái)的每一條消息,對(duì)其序列號(hào)SN_new進(jìn)行判斷,并作相應(yīng)處理。

只有SN_new比SN_old大1時(shí),即SN_new = SN_old + 1時(shí),broker才會(huì)接受這條消息;

SN_new < SN_old + 1,說(shuō)明消息被重復(fù)寫入,broker直接丟棄該條消息;

SN_new > SN_old + 1,說(shuō)明中間有數(shù)據(jù)尚未寫入,出現(xiàn)了消息亂序,可能存在消息丟失的現(xiàn)象,對(duì)應(yīng)的生產(chǎn)者會(huì)拋出OutOfOrderSequenceException。

注意:序列號(hào)針對(duì)<PID, 分區(qū)>,這意味著冪等生產(chǎn)者只能保證單個(gè)主題的單一分區(qū)內(nèi)消息不重復(fù);其次,它只能實(shí)現(xiàn)單會(huì)話上的冪等性,不能實(shí)現(xiàn)跨會(huì)話的冪等性,這里的會(huì)話即可以理解為:Producer進(jìn)程的一次運(yùn)行。當(dāng)重啟了Producer進(jìn)程之后,則冪等性保證就失效了。

事務(wù)

冪等性并不能跨多個(gè)分區(qū)運(yùn)作,而Kafka事務(wù)則可以彌補(bǔ)這個(gè)缺陷。Kafka從0.11版本開始提供了對(duì)事務(wù)的支持,主要在read committed隔離級(jí)別。它能保證多條消息原子性地寫入到目標(biāo)分區(qū),同時(shí)也能寶恒Consumer只能看到事務(wù)成功提交的消息。

Producer端配置

事務(wù)型Producer能保證消息原子性地寫入多個(gè)分區(qū)。批量的消息要么全部寫入成功,要么全部失敗。并且,事務(wù)型Producer在重啟后,Kafka依然保證它們發(fā)送消息的精確一次處理。開啟事務(wù)型Producer的配置如下:

和冪等性Producer一樣,開啟enable.idempotence = true。

設(shè)置Producer端參數(shù)transcational.id。最好為其設(shè)置一個(gè)有意義的名字。

設(shè)置了事務(wù)型的Producer可以調(diào)用一些事務(wù)API,如下:initTransaction、beginTransaction、commitTransaction和abortTransaction,分別對(duì)應(yīng)事務(wù)的初始化、事務(wù)開啟、事務(wù)提交和事務(wù)終止。

producer.initTransactions();
try {
 producer.beginTransaction();
  producer.send(record1);
  producer.send(record2);
  producer.commitTransaction();
  }
 catch (KafkaExecption e) {
 producer.abortTransaction();
 }

上述代碼中,事務(wù)型Producer可以保證record1和record2要么全部提交成功,要么全部寫入失敗。實(shí)際上,即使寫入失敗,Kafka也會(huì)將它們寫入到底層的日志中,也就是說(shuō)Consumer還是會(huì)看到這些消息,具體Consumer端讀取事務(wù)型Producer發(fā)送的消息需要另行配置。

Consumer端配置

讀取事務(wù)型Producer發(fā)送的消息時(shí),Consumer端的isolation.level參數(shù)表征著事務(wù)的隔離級(jí)別,即決定了Consumer以怎樣的級(jí)別去讀取消息。該參數(shù)有以下兩個(gè)取值: read_uncommitted:默認(rèn)值,表面Consumer能夠讀到Kafka寫入的任何消息,不論事務(wù)型Producer是否正常提交了事務(wù)。顯然,如果啟用了事務(wù)型的Producer,則Consumer端參數(shù)就不要使用該值,否則事務(wù)是無(wú)效的。 read_committed:表面Consumer只會(huì)讀取事務(wù)型Producer成功提交的事務(wù)中寫入的消息,同時(shí),非事務(wù)型Producer寫入的所有消息對(duì)Consumer也是可見的。

看完上述內(nèi)容,你們對(duì)如何解析Kafka 消息丟失與消費(fèi)精確一次性有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。

網(wǎng)站題目:如何解析Kafka消息丟失與消費(fèi)精確一次性
URL網(wǎng)址:http://muchs.cn/article20/ihceco.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、網(wǎng)站導(dǎo)航、定制開發(fā)電子商務(wù)、營(yíng)銷型網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁(yè)設(shè)計(jì)公司