一次KAFKA消費(fèi)者異常引起的思考

問(wèn)題描述:

線(xiàn)上出現(xiàn)一臺(tái)服務(wù)器特別慢,于是關(guān)閉了服務(wù)器上的kafka broker. 關(guān)閉后發(fā)現(xiàn)一些kafka consumer無(wú)法正常消費(fèi)數(shù)據(jù)了, 日志錯(cuò)誤:
o.a.kakfa.clients.consumer.internals.AbstractCordinator Marking the coordinator (39.0.2.100) as dead.

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)!專(zhuān)注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、小程序定制開(kāi)發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶(hù)創(chuàng)新互聯(lián)還提供了眉縣免費(fèi)建站歡迎大家使用!

原因:

經(jīng)過(guò)一番排查,發(fā)現(xiàn)consumer group信息:
(kafka.coordinator.GroupMetadataMessageFormatter類(lèi)型):
groupId::[groupId,Some(consumer),groupState,Map(memberId -> [memberId,clientId,clientHost,sessionTimeoutMs], ...->[]...)],
存到了KAFKA內(nèi)部topic: __consumer_offsets里, , 它的key是 groupId.
同時(shí)發(fā)現(xiàn)broker 參數(shù) offsets.topic.replication.factor 錯(cuò)誤地被設(shè)置為1. 這個(gè)參數(shù)表示TOPIC: __Consumer_offsets 的副本數(shù). 這樣一旦某個(gè)broker被關(guān)閉, 如果被關(guān)閉的Broker 是__Consumer_offsets的某些partition的Leader. 則導(dǎo)致某些consumer group 不可用. 如果一旦broker已經(jīng)啟動(dòng), 需要手工通過(guò)命令行來(lái)擴(kuò)展副本數(shù).

reassignment.json:
{"version":1,
 "partitions": [{"topic": "xxx", "partition": 0, "replicas": {brokerId1, brokerId2}}]
}
kafka-reassign-partitions  --zookeeper localhost:2818 --reassignment-json-file  reassignment.json --execute

客戶(hù)端尋找Consumer Coordinator的過(guò)程:
客戶(hù)端 org.apache.kafka.clients.consumer.internals.AbstractCoordinator
如果Coordinator 未知 (AbstractCoordinator.coordinatorUnknown()), 發(fā)起請(qǐng)求 lookupCoordinator,向負(fù)載最低的節(jié)點(diǎn)發(fā)送FindCoordinatorRequest

服務(wù)端 KafkaApis.handleFindCoordinatorRequest 接收請(qǐng)求:
首先調(diào)用 GroupMetaManager.partitionFor(consumerGroupId) consunerGroupId 的 hashCode 對(duì) __consumer_offsets 總的分片數(shù)取模獲取partition id 再?gòu)?__consumer_offset 這個(gè)Topic 中找到partition對(duì)應(yīng)的 Partition Metadata, 并且獲取對(duì)應(yīng)的Partition leader 返回給客戶(hù)端

引伸思考

KAFKA 的failover機(jī)制究竟是怎么樣的?假使 __consumer_offset 設(shè)置了正確的副本數(shù),重選舉的過(guò)程是怎樣的. 如果broker宕機(jī)后導(dǎo)致某些副本不可用, 副本會(huì)自動(dòng)遷移到其他節(jié)點(diǎn)嗎?帶著這些問(wèn)題稍微閱讀了一下KAFKA的相關(guān)代碼:

當(dāng)一個(gè)Broker 被關(guān)掉時(shí), 會(huì)有兩步操作:
KafkaController.onBrokerFailure ->KafkaController.onReplicasBecomeOffline
主要是通過(guò) PartitionStateMachine.handleStateChanges 方法通知Partition狀態(tài)機(jī)將狀態(tài)置為offline. ReplicaStateMachine.handleStateChanges方法會(huì)將Replica 狀態(tài)修改為OfflineReplica, 同時(shí)修改partition ISR. 如果被關(guān)閉broker 是partition leader 那么需要重新觸發(fā)partition leader 選舉,最后發(fā)送LeaderAndIsrRequest獲取最新的Leader ISR 信息.
KafkaController.unregisterBrokerModificationsHandler 取消注冊(cè)的BrokerModificationsHandler 并取消zookeeper 中broker 事件的監(jiān)聽(tīng).

當(dāng)ISR請(qǐng)求被發(fā)出,KafkaApis.handleLeaderAndIsrRequest() 會(huì)被調(diào)用. 這里如果需要變更leader的partition是屬于__consumer_offset這個(gè)特殊的topic,取決于當(dāng)前的broker節(jié)點(diǎn)是不是partition leader. 會(huì)分別調(diào)用GroupCoordinator.handleGroupImmigrationGroupCoordinator.handleGroupEmmigration. 如果是partition leader, GroupCoordinator.handleGroupImmigration -> GroupMetadataManager.loadGroupsForPartition 會(huì)重新從 __consumer_offset 讀取group數(shù)據(jù)到本地metadata cache, 如果是partition follower, GroupCoordniator.handleGroupImigration -> GroupMetadataManager.removeGroupsForPartition 會(huì)從metadata cache中移除group信息. 并在onGroupUnloaded回調(diào)函數(shù)中將group的狀態(tài)變更為dead. 同時(shí)通知所有等待join或者sync的組成員.

KAFKA在Broker關(guān)閉時(shí)不會(huì)自動(dòng)做partition 副本的遷移. 這時(shí)被關(guān)閉的Broker上的副本變?yōu)閡nder replicated 狀態(tài). 這種狀態(tài)將持續(xù)直到Broker被重新拉起并且追上新的數(shù)據(jù), 或者用戶(hù)通過(guò)命令行 手動(dòng)復(fù)制副本到其他節(jié)點(diǎn).

官方建議設(shè)置兩個(gè)參數(shù)來(lái)保證graceful shutdown. controlled.shutdown.enable=true auto.leader.rebalance.enable=true前者保證關(guān)機(jī)之前將日志數(shù)據(jù)同步到磁盤(pán),并進(jìn)行重選舉. 后者保證在broker重新恢復(fù)后再次獲得宕機(jī)前l(fā)eader狀態(tài). 避免leader分配不均勻?qū)е伦x寫(xiě)熱點(diǎn).

Reference

https://blog.csdn.net/zhanglh046/article/details/72833129
https://blog.csdn.net/huochen1994/article/details/80511038
https://www.jianshu.com/p/1aba6e226763

名稱(chēng)欄目:一次KAFKA消費(fèi)者異常引起的思考
分享網(wǎng)址:http://muchs.cn/article32/pphgsc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作云服務(wù)器、網(wǎng)站維護(hù)軟件開(kāi)發(fā)、網(wǎng)站內(nèi)鏈、網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化