Kafka怎么用

這篇文章主要介紹“Kafka怎么用”,在日常操作中,相信很多人在Kafka怎么用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Kafka怎么用”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

創(chuàng)新互聯(lián)公司致力于互聯(lián)網(wǎng)品牌建設(shè)與網(wǎng)絡(luò)營銷,包括成都網(wǎng)站建設(shè)、網(wǎng)站制作、SEO優(yōu)化、網(wǎng)絡(luò)推廣、整站優(yōu)化營銷策劃推廣、電子商務(wù)、移動互聯(lián)網(wǎng)營銷等。創(chuàng)新互聯(lián)公司為不同類型的客戶提供良好的互聯(lián)網(wǎng)應(yīng)用定制及解決方案,創(chuàng)新互聯(lián)公司核心團隊十余年專注互聯(lián)網(wǎng)開發(fā),積累了豐富的網(wǎng)站經(jīng)驗,為廣大企業(yè)客戶提供一站式企業(yè)網(wǎng)站建設(shè)服務(wù),在網(wǎng)站建設(shè)行業(yè)內(nèi)樹立了良好口碑。

一、Kafka應(yīng)用

當(dāng)Kafka集群流量達到 萬億級記錄/天或者十萬億級記錄/天  甚至更高后,我們需要具備哪些能力才能保障集群高可用、高可靠、高性能、高吞吐、安全的運行。

這里總結(jié)內(nèi)容主要針對Kafka2.1.1版本,包括集群版本升級、數(shù)據(jù)遷移、流量限制、監(jiān)控告警、負載均衡、集群擴/縮容、資源隔離、集群容災(zāi)、集群安全、性能優(yōu)化、平臺化、開源版本缺陷、社區(qū)動態(tài)等方面。本文主要是介紹核心脈絡(luò),不做過多細節(jié)講解。下面我們先來看看Kafka作為數(shù)據(jù)中樞的一些核心應(yīng)用場景。

Kafka怎么用

下圖展示了一些主流的數(shù)據(jù)處理流程,Kafka起到一個數(shù)據(jù)中樞的作用。

Kafka怎么用

接下來看看我們Kafka平臺整體架構(gòu);

Kafka怎么用

1.1 版本升級

1.1.1  開源版本如何進行版本滾動升級與回退

官網(wǎng)地址:http://kafka.apache.org

1.1.1.2 源碼改造如何升級與回退

由于在升級過程中,必然出現(xiàn)新舊代碼邏輯交替的情況。集群內(nèi)部部分節(jié)點是開源版本,另外一部分節(jié)點是改造后的版本。所以,需要考慮在升級過程中,新舊代碼混合的情況,如何兼容以及出現(xiàn)故障時如何回退。

1.2 數(shù)據(jù)遷移

由于Kafka集群的架構(gòu)特點,這必然導(dǎo)致集群內(nèi)流量負載不均衡的情況,所以我們需要做一些數(shù)據(jù)遷移來實現(xiàn)集群不同節(jié)點間的流量均衡。Kafka開源版本為數(shù)據(jù)遷移提供了一個腳本工具“bin/kafka-reassign-partitions.sh”,如果自己沒有實現(xiàn)自動負載均衡,可以使用此腳本。

開源版本提供的這個腳本生成遷移計劃完全是人工干預(yù)的,當(dāng)集群規(guī)模非常大時,遷移效率變得非常低下,一般以天為單位進行計算。當(dāng)然,我們可以實現(xiàn)一套自動化的均衡程序,當(dāng)負載均衡實現(xiàn)自動化以后,基本使用調(diào)用內(nèi)部提供的API,由程序去幫我們生成遷移計劃及執(zhí)行遷移任務(wù)。需要注意的是,遷移計劃有指定數(shù)據(jù)目錄和不指定數(shù)據(jù)目錄兩種,指定數(shù)據(jù)目錄的需要配置ACL安全認證。

官網(wǎng)地址:http://kafka.apache.org

1.2.1 broker間數(shù)據(jù)遷移

不指定數(shù)據(jù)目錄

//未指定遷移目錄的遷移計劃
{
    "version":1,
    "partitions":[
        {"topic":"yyj4","partition":0,"replicas":[1000003,1000004]},
        {"topic":"yyj4","partition":1,"replicas":[1000003,1000004]},
        {"topic":"yyj4","partition":2,"replicas":[1000003,1000004]}
    ]
}

指定數(shù)據(jù)目錄

//指定遷移目錄的遷移計劃
{
    "version":1,
    "partitions":[
        {"topic":"yyj1","partition":0,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
        {"topic":"yyj1","partition":1,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
        {"topic":"yyj1","partition":2,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]}
    ]
}
1.2.2 broker內(nèi)部磁盤間數(shù)據(jù)遷移

生產(chǎn)環(huán)境的服務(wù)器一般都是掛載多塊硬盤,比如4塊/12塊等;那么可能出現(xiàn)在Kafka集群內(nèi)部,各broker間流量比較均衡,但是在broker內(nèi)部,各磁盤間流量不均衡,導(dǎo)致部分磁盤過載,從而影響集群性能和穩(wěn)定,也沒有較好的利用硬件資源。在這種情況下,我們就需要對broker內(nèi)部多塊磁盤的流量做負載均衡,讓流量更均勻的分布到各磁盤上。

1.2.3 并發(fā)數(shù)據(jù)遷移

當(dāng)前Kafka開源版本(2.1.1版本)提供的副本遷移工具“bin/kafka-reassign-partitions.sh”在同一個集群內(nèi)只能實現(xiàn)遷移任務(wù)的串行。對于集群內(nèi)已經(jīng)實現(xiàn)多個資源組物理隔離的情況,由于各資源組不會相互影響,但是卻不能友好的進行并行的提交遷移任務(wù),遷移效率有點低下,這種不足直到2.6.0版本才得以解決。如果需要實現(xiàn)并發(fā)數(shù)據(jù)遷移,可以選擇升級Kafka版本或者修改Kafka源碼。

1.2.4 終止數(shù)據(jù)遷移

當(dāng)前Kafka開源版本(2.1.1版本)提供的副本遷移工具“bin/kafka-reassign-partitions.sh”在啟動遷移任務(wù)后,無法終止遷移。當(dāng)遷移任務(wù)對集群的穩(wěn)定性或者性能有影響時,將變得束手無策,只能等待遷移任務(wù)執(zhí)行完畢(成功或者失敗),這種不足直到2.6.0版本才得以解決。如果需要實現(xiàn)終止數(shù)據(jù)遷移,可以選擇升級Kafka版本或者修改Kafka源碼。

1.3 流量限制

1.3.1 生產(chǎn)消費流量限制

經(jīng)常會出現(xiàn)一些突發(fā)的,不可預(yù)測的異常生產(chǎn)或者消費流量會對集群的IO等資源產(chǎn)生巨大壓力,最終影響整個集群的穩(wěn)定與性能。那么我們可以對用戶的生產(chǎn)、消費、副本間數(shù)據(jù)同步進行流量限制,這個限流機制并不是為了限制用戶,而是避免突發(fā)的流量影響集群的穩(wěn)定和性能,給用戶可以更好的服務(wù)。

如下圖所示,節(jié)點入流量由140MB/s左右突增到250MB/s,而出流量則從400MB/s左右突增至800MB/s。如果沒有限流機制,那么集群的多個節(jié)點將有被這些異常流量打掛的風(fēng)險,甚至造成集群雪崩。

Kafka怎么用

Kafka怎么用

圖片生產(chǎn)/消費流量限制官網(wǎng)地址:點擊鏈接

對于生產(chǎn)者和消費者的流量限制,官網(wǎng)提供了以下幾種維度組合進行限制(當(dāng)然,下面限流機制存在一定缺陷,后面在“Kafka開源版本功能缺陷”我們將提到):

/config/users/<user>/clients/<client-id> //根據(jù)用戶和客戶端ID組合限流
/config/users/<user>/clients/<default>
/config/users/<user>//根據(jù)用戶限流 這種限流方式是我們最常用的方式
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>

在啟動Kafka的broker服務(wù)時需要開啟JMX參數(shù)配置,方便通過其他應(yīng)用程序采集Kafka的各項JMX指標(biāo)進行服務(wù)監(jiān)控。當(dāng)用戶需要調(diào)整限流閾值時,根據(jù)單個broker所能承受的流量進行智能評估,無需人工干預(yù)判斷是否可以調(diào)整;對于用戶流量限制,主要需要參考的指標(biāo)包括以下兩個:

(1)消費流量指標(biāo):ObjectName:kafka.server:type=Fetch,user=acl認證用戶名稱 屬性:byte-rate(用戶在當(dāng)前broker的出流量)、throttle-time(用戶在當(dāng)前broker的出流量被限制時間)
(2)生產(chǎn)流量指標(biāo):ObjectName:kafka.server:type=Produce,user=acl認證用戶名稱 屬性:byte-rate(用戶在當(dāng)前broker的入流量)、throttle-time(用戶在當(dāng)前broker的入流量被限制時間)

Kafka怎么用

Kafka怎么用

1.3.2 follower同步leader/數(shù)據(jù)遷移流量限制

副本遷移/數(shù)據(jù)同步流量限制官網(wǎng)地址:鏈接

涉及參數(shù)如下:

//副本同步限流配置共涉及以下4個參數(shù)
leader.replication.throttled.rate
follower.replication.throttled.rate
leader.replication.throttled.replicas
follower.replication.throttled.replicas

輔助指標(biāo)如下:

(1)副本同步出流量指標(biāo):ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
(2)副本同步入流量指標(biāo):ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec

Kafka怎么用

Kafka怎么用

1.4 監(jiān)控告警

關(guān)于Kafka的監(jiān)控有一些開源的工具可用使用,比如下面這幾種:

Kafka Manager;

Kafka Eagle;

Kafka Monitor;

KafkaOffsetMonitor;

我們已經(jīng)把Kafka Manager作為我們查看一些基本指標(biāo)的工具嵌入平臺,然而這些開源工具不能很好的融入到我們自己的業(yè)務(wù)系統(tǒng)或者平臺上。所以,我們需要自己去實現(xiàn)一套粒度更細、監(jiān)控更智能、告警更精準(zhǔn)的系統(tǒng)。其監(jiān)控覆蓋范圍應(yīng)該包括基礎(chǔ)硬件、操作系統(tǒng)(操作系統(tǒng)偶爾出現(xiàn)系統(tǒng)進程hang住情況,導(dǎo)致broker假死,無法正常提供服務(wù))、Kafka的broker服務(wù)、Kafka客戶端應(yīng)用程序、zookeeper集群、上下游全鏈路監(jiān)控。

1.4.1 硬件監(jiān)控

網(wǎng)絡(luò)監(jiān)控:

核心指標(biāo)包括網(wǎng)絡(luò)入流量、網(wǎng)絡(luò)出流量、網(wǎng)絡(luò)丟包、網(wǎng)絡(luò)重傳、處于TIME.WAIT的TCP連接數(shù)、交換機、機房帶寬、DNS服務(wù)器監(jiān)控(如果DNS服務(wù)器異常,可能出現(xiàn)流量黑洞,引起大面積業(yè)務(wù)故障)等。

Kafka怎么用

Kafka怎么用

Kafka怎么用

Kafka怎么用

Kafka怎么用

磁盤監(jiān)控:

核心指標(biāo)包括監(jiān)控磁盤write、磁盤read(如果消費時沒有延時,或者只有少量延時,一般都沒有磁盤read操作)、磁盤ioutil、磁盤iowait(這個指標(biāo)如果過高說明磁盤負載較大)、磁盤存儲空間、磁盤壞盤、磁盤壞塊/壞道(壞道或者壞塊將導(dǎo)致broker處于半死不活狀態(tài),由于有crc校驗,消費者將被卡?。┑?。

Kafka怎么用

Kafka怎么用

Kafka怎么用

Kafka怎么用

CPU監(jiān)控:

監(jiān)控CPU空閑率/負載,主板故障等,通常CPU使用率比較低不是Kafka的瓶頸。

內(nèi)存/交換區(qū)監(jiān)控:

內(nèi)存使用率,內(nèi)存故障。一般情況下,服務(wù)器上除了啟動Kafka的broker時分配的堆內(nèi)存以外,其他內(nèi)存基本全部被用來做PageCache。

緩存命中率監(jiān)控:

由于是否讀磁盤對Kafka的性能影響很大,所以我們需要監(jiān)控Linux的PageCache緩存命中率,如果緩存命中率高,則說明消費者基本命中緩存。

詳細內(nèi)容請閱讀文章:《Linux Page Cache調(diào)優(yōu)在Kafka中的應(yīng)用》。

系統(tǒng)日志:

我們需要對操作系統(tǒng)的錯誤日志進行監(jiān)控告警,及時發(fā)現(xiàn)一些硬件故障。

1.4.2 broker服務(wù)監(jiān)控

broker服務(wù)的監(jiān)控,主要是通過在broker服務(wù)啟動時指定JMX端口,然后通過實現(xiàn)一套指標(biāo)采集程序去采集JMX指標(biāo)。(服務(wù)端指標(biāo)官網(wǎng)地址)

**broker級監(jiān)控:**broker進程、broker入流量字節(jié)大小/記錄數(shù)、broker出流量字節(jié)大小/記錄數(shù)、副本同步入流量、副本同步出流量、broker間流量偏差、broker連接數(shù)、broker請求隊列數(shù)、broker網(wǎng)絡(luò)空閑率、broker生產(chǎn)延時、broker消費延時、broker生產(chǎn)請求數(shù)、broker消費請求數(shù)、broker上分布leader個數(shù)、broker上分布副本個數(shù)、broker上各磁盤流量、broker GC等。

**topic級監(jiān)控:**topic入流量字節(jié)大小/記錄數(shù)、topic出流量字節(jié)大小/記錄數(shù)、無流量topic、topic流量突變(突增/突降)、topic消費延時。

**partition級監(jiān)控:**分區(qū)入流量字節(jié)大小/記錄數(shù)、分區(qū)出流量字節(jié)大小/記錄數(shù)、topic分區(qū)副本缺失、分區(qū)消費延遲記錄、分區(qū)leader切換、分區(qū)數(shù)據(jù)傾斜(生產(chǎn)消息時,如果指定了消息的key容易造成數(shù)據(jù)傾斜,這嚴重影響Kafka的服務(wù)性能)、分區(qū)存儲大?。梢灾卫韱畏謪^(qū)過大的topic)。

**用戶級監(jiān)控:**用戶出/入流量字節(jié)大小、用戶出/入流量被限制時間、用戶流量突變(突增/突降)。

**broker服務(wù)日志監(jiān)控:**對server端打印的錯誤日志進行監(jiān)控告警,及時發(fā)現(xiàn)服務(wù)異常。

1.4.3.客戶端監(jiān)控

客戶端監(jiān)控主要是自己實現(xiàn)一套指標(biāo)上報程序,這個程序需要實現(xiàn) 

org.apache.kafka.common.metrics.MetricsReporter 接口。然后在生產(chǎn)者或者消費者的配置中加入配置項 metric.reporters,如下所示:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
//ClientMetricsReporter類實現(xiàn)org.apache.kafka.common.metrics.MetricsReporter接口
props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName());
...

客戶端指標(biāo)官網(wǎng)地址:

http://kafka.apache.org/21/documentation.html#selector_monitoring

http://kafka.apache.org/21/documentation.html#common_node_monitoring

http://kafka.apache.org/21/documentation.html#producer_monitoring

http://kafka.apache.org/21/documentation.html#producer_sender_monitoring

http://kafka.apache.org/21/documentation.html#consumer_monitoring

http://kafka.apache.org/21/documentation.html#consumer_fetch_monitoring

客戶端監(jiān)控流程架構(gòu)如下圖所示:

Kafka怎么用

1.4.3.1 生產(chǎn)者客戶端監(jiān)控

**維度:**用戶名稱、客戶端ID、客戶端IP、topic名稱、集群名稱、brokerIP;

**指標(biāo):**連接數(shù)、IO等待時間、生產(chǎn)流量大小、生產(chǎn)記錄數(shù)、請求次數(shù)、請求延時、發(fā)送錯誤/重試次數(shù)等。

1.4.3.2 消費者客戶端監(jiān)控

**維度:**用戶名稱、客戶端ID、客戶端IP、topic名稱、集群名稱、消費組、brokerIP、topic分區(qū);

**指標(biāo):**連接數(shù)、io等待時間、消費流量大小、消費記錄數(shù)、消費延時、topic分區(qū)消費延遲記錄等。

1.4.4 Zookeeper監(jiān)控
  1. Zookeeper進程監(jiān)控;

  2. Zookeeper的leader切換監(jiān)控;

  3. Zookeeper服務(wù)的錯誤日志監(jiān)控;

1.4.5 全鏈路監(jiān)控

當(dāng)數(shù)據(jù)鏈路非常長的時候(比如:業(yè)務(wù)應(yīng)用->埋點SDk->數(shù)據(jù)采集->Kafka->實時計算->業(yè)務(wù)應(yīng)用),我們定位問題通常需要經(jīng)過多個團隊反復(fù)溝通與排查才能發(fā)現(xiàn)問題到底出現(xiàn)在哪個環(huán)節(jié),這樣排查問題效率比較低下。在這種情況下,我們就需要與上下游一起梳理整個鏈路的監(jiān)控。出現(xiàn)問題時,第一時間定位問題出現(xiàn)在哪個環(huán)節(jié),縮短問題定位與故障恢復(fù)時間。

1.5 資源隔離

1.5.1 相同集群不同業(yè)務(wù)資源物理隔離

我們對所有集群中不同對業(yè)務(wù)進行資源組物理隔離,避免各業(yè)務(wù)之間相互影響。在這里,我們假設(shè)集群有4個broker節(jié)點(Broker1/Broker2/Broker3/Broker4),2個業(yè)務(wù)(業(yè)務(wù)A/業(yè)務(wù)B),他們分別擁有topic分區(qū)分布如下圖所示,兩個業(yè)務(wù)topic都分散在集群的各個broker上,并且在磁盤層面也存在交叉。

試想一下,如果我們其中一個業(yè)務(wù)異常,比如流量突增,導(dǎo)致broker節(jié)點異?;蛘弑淮驋?。那么這時候另外一個業(yè)務(wù)也將受到影響,這樣將大大的影響了我們服務(wù)的可用性,造成故障,擴大了故障影響范圍。

Kafka怎么用

針對這些痛點,我們可以對集群中的業(yè)務(wù)進行物理資源隔離,各業(yè)務(wù)獨享資源,進行資源組劃分(這里把4各broker劃分為Group1和Group2兩個資源組)如下圖所示,不同業(yè)務(wù)的topic分布在自己的資源組內(nèi),當(dāng)其中一個業(yè)務(wù)異常時,不會波及另外一個業(yè)務(wù),這樣就可以有效的縮小我們的故障范圍,提高服務(wù)可用性。

Kafka怎么用

1.6 集群歸類

我們把集群根據(jù)業(yè)務(wù)特點進行拆分為日志集群、監(jiān)控集群、計費集群、搜索集群、離線集群、在線集群等,不同場景業(yè)務(wù)放在不同集群,避免不同業(yè)務(wù)相互影響。

1.7 擴容/縮容

1.7.1 topic擴容分區(qū)

隨著topic數(shù)據(jù)量增長,我們最初創(chuàng)建的topic指定的分區(qū)個數(shù)可能已經(jīng)無法滿足數(shù)量流量要求,所以我們需要對topic的分區(qū)進行擴展。擴容分區(qū)時需要考慮一下幾點:

必須保證topic分區(qū)leader與follower輪詢的分布在資源組內(nèi)所有broker上,讓流量分布更加均衡,同時需要考慮相同分區(qū)不同副本跨機架分布以提高容災(zāi)能力;

當(dāng)topic分區(qū)leader個數(shù)除以資源組節(jié)點個數(shù)有余數(shù)時,需要把余數(shù)分區(qū)leader優(yōu)先考慮放入流量較低的broker。

1.7.2 broker上線

隨著業(yè)務(wù)量增多,數(shù)據(jù)量不斷增大,我們的集群也需要進行broker節(jié)點擴容。關(guān)于擴容,我們需要實現(xiàn)以下幾點:

擴容智能評估:根據(jù)集群負載,把是否需要擴容評估程序化、智能化;

智能擴容:當(dāng)評估需要擴容后,把擴容流程以及流量均衡平臺化。

1.7.3 broker下線

某些場景下,我們需要下線我們的broker,主要包括以下幾個場景:

一些老化的服務(wù)器需要下線,實現(xiàn)節(jié)點下線平臺化;

服務(wù)器故障,broker故障無法恢復(fù),我們需要下線故障服務(wù)器,實現(xiàn)節(jié)點下線平臺化;

有更優(yōu)配置的服務(wù)器替換已有broker節(jié)點,實現(xiàn)下線節(jié)點平臺化。

1.8 負載均衡

我們?yōu)槭裁葱枰撦d均衡呢?首先,我們來看第一張圖,下圖是我們集群某個資源組剛擴容后的流量分布情況,流量無法自動的分攤到我們新擴容后的節(jié)點上。那么這個時候需要我們手動去觸發(fā)數(shù)據(jù)遷移,把部分副本遷移至新節(jié)點上才能實現(xiàn)流量均衡。

Kafka怎么用

下面,我們來看一下第二張圖。這張圖我們可以看出流量分布非常不均衡,最低和最高流量偏差數(shù)倍以上。這和Kafka的架構(gòu)特點有關(guān),當(dāng)集群規(guī)模與數(shù)據(jù)量達到一定量后,必然出現(xiàn)當(dāng)問題。這種情況下,我們也需要進行負載均衡。

Kafka怎么用

我們再來看看第三張圖。這里我們可以看出出流量只有部分節(jié)點突增,這就是topic分區(qū)在集群內(nèi)部不夠分散,集中分布到了某幾個broker導(dǎo)致,這種情況我們也需要進行擴容分區(qū)和均衡。

Kafka怎么用

我們比較理想的流量分布應(yīng)該如下圖所示,各節(jié)點間流量偏差非常小,這種情況下,既可以增強集群扛住流量異常突增的能力又可以提升集群整體資源利用率和服務(wù)穩(wěn)定性,降低成本。

Kafka怎么用

Kafka怎么用

負載均衡我們需要實現(xiàn)以下效果:

1)生成副本遷移計劃以及執(zhí)行遷移任務(wù)平臺化、自動化、智能化;

2)執(zhí)行均衡后broker間流量比較均勻,且單個topic分區(qū)均勻分布在所有broker節(jié)點上;

3)執(zhí)行均衡后broker內(nèi)部多塊磁盤間流量比較均衡;

要實現(xiàn)這個效果,我們需要開發(fā)一套自己的負載均衡工具,如對開源的 cruise control進行二次開發(fā);此工具的核心主要在生成遷移計劃的策略,遷移計劃的生成方案直接影響到最后集群負載均衡的效果。參考內(nèi)容:

1. linkedIn/cruise-control

2. Introduction to Kafka Cruise Control

3. Cloudera Cruise Control REST API Reference

cruise control架構(gòu)圖如下:

Kafka怎么用

在生成遷移計劃時,我們需要考慮以下幾點:

1)選擇核心指標(biāo)作為生成遷移計劃的依據(jù),比如出流量、入流量、機架、單topic分區(qū)分散性等;

2)優(yōu)化用來生成遷移計劃的指標(biāo)樣本,比如過濾流量突增/突降/掉零等異常樣本;

3)各資源組的遷移計劃需要使用的樣本全部為資源組內(nèi)部樣本,不涉及其他資源組,無交叉;

4)治理單分區(qū)過大topic,讓topic分區(qū)分布更分散,流量不集中在部分broker,讓topic單分區(qū)數(shù)據(jù)量更小,這樣可以減少遷移的數(shù)據(jù)量,提升遷移速度;

5)已經(jīng)均勻分散在資源組內(nèi)的topic,加入遷移黑名單,不做遷移,這樣可以減少遷移的數(shù)據(jù)量,提升遷移速度;

6)做topic治理,排除長期無流量topic對均衡的干擾;

7)新建topic或者topic分區(qū)擴容時,應(yīng)讓所有分區(qū)輪詢分布在所有broker節(jié)點,輪詢后余數(shù)分區(qū)優(yōu)先分布流量較低的broker;

8)擴容broker節(jié)點后開啟負載均衡時,優(yōu)先把同一broker分配了同一大流量(流量大而不是存儲空間大,這里可以認為是每秒的吞吐量)topic多個分區(qū)leader的,遷移一部分到新broker節(jié)點;

9)提交遷移任務(wù)時,同一批遷移計劃中的分區(qū)數(shù)據(jù)大小偏差應(yīng)該盡可能小,這樣可以避免遷移任務(wù)中小分區(qū)遷移完成后長時間等待大分區(qū)的遷移,造成任務(wù)傾斜;

1.9 安全認證

是不是我們的集群所有人都可以隨意訪問呢?當(dāng)然不是,為了集群的安全,我們需要進行權(quán)限認證,屏蔽非法操作。主要包括以下幾個方面需要做安全認證:

(1)生產(chǎn)者權(quán)限認證;

(2)消費者權(quán)限認證;

(3)指定數(shù)據(jù)目錄遷移安全認證;

官網(wǎng)地址:http://kafka.apache.org

1.10 集群容災(zāi)

跨機架容災(zāi):

官網(wǎng)地址:http://kafka.apache.org

**跨集群/機房容災(zāi):**如果有異地雙活等業(yè)務(wù)場景時,可以參考Kafka2.7版本的MirrorMaker 2.0。

GitHub地址:https://github.com

精確KIP地址 :https://cwiki.apache.org

**ZooKeeper集群上Kafka元數(shù)據(jù)恢復(fù):**我們會定期對ZooKeeper上的權(quán)限信息數(shù)據(jù)做備份處理,當(dāng)集群元數(shù)據(jù)異常時用于恢復(fù)。

1.11 參數(shù)/配置優(yōu)化

**broker服務(wù)參數(shù)優(yōu)化:**這里我只列舉部分影響性能的核心參數(shù)。

num.network.threads
#創(chuàng)建Processor處理網(wǎng)絡(luò)請求線程個數(shù),建議設(shè)置為broker當(dāng)CPU核心數(shù)*2,這個值太低經(jīng)常出現(xiàn)網(wǎng)絡(luò)空閑太低而缺失副本。
 
num.io.threads
#創(chuàng)建KafkaRequestHandler處理具體請求線程個數(shù),建議設(shè)置為broker磁盤個數(shù)*2
 
num.replica.fetchers
#建議設(shè)置為CPU核心數(shù)/4,適當(dāng)提高可以提升CPU利用率及follower同步leader數(shù)據(jù)當(dāng)并行度。
 
compression.type
#建議采用lz4壓縮類型,壓縮可以提升CPU利用率同時可以減少網(wǎng)絡(luò)傳輸數(shù)據(jù)量。
 
queued.max.requests
#如果是生產(chǎn)環(huán)境,建議配置最少500以上,默認為500。
 
log.flush.scheduler.interval.ms
log.flush.interval.ms
log.flush.interval.messages
#這幾個參數(shù)表示日志數(shù)據(jù)刷新到磁盤的策略,應(yīng)該保持默認配置,刷盤策略讓操作系統(tǒng)去完成,由操作系統(tǒng)來決定什么時候把數(shù)據(jù)刷盤;
#如果設(shè)置來這個參數(shù),可能對吞吐量影響非常大;
 
auto.leader.rebalance.enable
#表示是否開啟leader自動負載均衡,默認true;我們應(yīng)該把這個參數(shù)設(shè)置為false,因為自動負載均衡不可控,可能影響集群性能和穩(wěn)定;

**生產(chǎn)優(yōu)化:**這里我只列舉部分影響性能的核心參數(shù)。

linger.ms
#客戶端生產(chǎn)消息等待多久時間才發(fā)送到服務(wù)端,單位:毫秒。和batch.size參數(shù)配合使用;適當(dāng)調(diào)大可以提升吞吐量,但是如果客戶端如果down機有丟失數(shù)據(jù)風(fēng)險;
 
batch.size
#客戶端發(fā)送到服務(wù)端消息批次大小,和linger.ms參數(shù)配合使用;適當(dāng)調(diào)大可以提升吞吐量,但是如果客戶端如果down機有丟失數(shù)據(jù)風(fēng)險;
 
compression.type
#建議采用lz4壓縮類型,具備較高的壓縮比及吞吐量;由于Kafka對CPU的要求并不高,所以,可以通過壓縮,充分利用CPU資源以提升網(wǎng)絡(luò)吞吐量;
 
buffer.memory
#客戶端緩沖區(qū)大小,如果topic比較大,且內(nèi)存比較充足,可以適當(dāng)調(diào)高這個參數(shù),默認只為33554432(32MB)
 
retries
#生產(chǎn)失敗后的重試次數(shù),默認0,可以適當(dāng)增加。當(dāng)重試超過一定次數(shù)后,如果業(yè)務(wù)要求數(shù)據(jù)準(zhǔn)確性較高,建議做容錯處理。
 
retry.backoff.ms
#生產(chǎn)失敗后,重試時間間隔,默認100ms,建議不要設(shè)置太大或者太小。

除了一些核心參數(shù)優(yōu)化外,我們還需要考慮比如topic的分區(qū)個數(shù)和topic保留時間;如果分區(qū)個數(shù)太少,保留時間太長,但是寫入數(shù)據(jù)量非常大的話,可能造成以下問題:

1)topic分區(qū)集中落在某幾個broker節(jié)點上,導(dǎo)致流量副本失衡;

2)導(dǎo)致broker節(jié)點內(nèi)部某幾塊磁盤讀寫超負載,存儲被寫爆;

1.11.1 消費優(yōu)化

消費最大的問題,并且經(jīng)常出現(xiàn)的問題就是消費延時,拉歷史數(shù)據(jù)。當(dāng)大量拉取歷史數(shù)據(jù)時將出現(xiàn)大量讀盤操作,污染pagecache,這個將加重磁盤的負載,影響集群性能和穩(wěn)定;

可以怎樣減少或者避免大量消費延時呢?

1)當(dāng)topic數(shù)據(jù)量非常大時,建議一個分區(qū)開啟一個線程去消費;

2)對topic消費延時添加監(jiān)控告警,及時發(fā)現(xiàn)處理;

3)當(dāng)topic數(shù)據(jù)可以丟棄時,遇到超大延時,比如單個分區(qū)延遲記錄超過千萬甚至數(shù)億,那么可以重置topic的消費點位進行緊急處理;【此方案一般在極端場景才使用】

4)避免重置topic的分區(qū)offset到很早的位置,這可能造成拉取大量歷史數(shù)據(jù);

1.11.2 Linux服務(wù)器參數(shù)優(yōu)化

我們需要對Linux的文件句柄、pagecache等參數(shù)進行優(yōu)化。可參考《Linux Page Cache調(diào)優(yōu)在Kafka中的應(yīng)用》。

1.12.硬件優(yōu)化

磁盤優(yōu)化

在條件允許的情況下,可以采用SSD固態(tài)硬盤替換HDD機械硬盤,解決機械盤IO性能較低的問題;如果沒有SSD固態(tài)硬盤,則可以對服務(wù)器上的多塊硬盤做硬RAID(一般采用RAID10),讓broker節(jié)點的IO負載更加均衡。如果是HDD機械硬盤,一個broker可以掛載多塊硬盤,比如 12塊*4TB。

內(nèi)存

由于Kafka屬于高頻讀寫型服務(wù),而Linux的讀寫請求基本走的都是Page Cache,所以單節(jié)點內(nèi)存大一些對性能會有比較明顯的提升。一般選擇256GB或者更高。

網(wǎng)絡(luò)

提升網(wǎng)絡(luò)帶寬:在條件允許的情況下,網(wǎng)絡(luò)帶寬越大越好。因為這樣網(wǎng)絡(luò)帶寬才不會成為性能瓶頸,最少也要達到萬兆網(wǎng)絡(luò)( 10Gb,網(wǎng)卡為全雙工)才能具備相對較高的吞吐量。如果是單通道,網(wǎng)絡(luò)出流量與入流量之和的上限理論值是1.25GB/s;如果是雙工雙通道,網(wǎng)絡(luò)出入流量理論值都可以達到1.25GB/s。

網(wǎng)絡(luò)隔離打標(biāo):由于一個機房可能既部署有離線集群(比如HBase、Spark、Hadoop等)又部署有實時集群(如Kafka)。那么實時集群和離線集群掛載到同一個交換機下的服務(wù)器將出現(xiàn)競爭網(wǎng)絡(luò)帶寬的問題,離線集群可能對實時集群造成影響。所以我們需要進行交換機層面的隔離,讓離線機器和實時集群不要掛載到相同的交換機下。即使有掛載到相同交換機下面的,我們也將進行網(wǎng)絡(luò)通行優(yōu)先級(金、銀、銅、鐵)標(biāo)記,當(dāng)網(wǎng)絡(luò)帶寬緊張的時候,讓實時業(yè)務(wù)優(yōu)先通行。

CPU

Kafka的瓶頸不在CPU,單節(jié)點一般有32核的CPU都足夠使用。

1.13.平臺化

現(xiàn)在問題來了,前面我們提到很多監(jiān)控、優(yōu)化等手段;難道我們管理員或者業(yè)務(wù)用戶對集群所有的操作都需要登錄集群服務(wù)器嗎?答案當(dāng)然是否定的,我們需要豐富的平臺化功能來支持。一方面是為了提升我們操作的效率,另外一方面也是為了提升集群的穩(wěn)定和降低出錯的可能。

配置管理

黑屏操作,每次修改broker的server.properties配置文件都沒有變更記錄可追溯,有時可能因為有人修改了集群配置導(dǎo)致一些故障,卻找不到相關(guān)記錄。如果我們把配置管理做到平臺上,每次變更都有跡可循,同時降低了變更出錯的風(fēng)險。

滾動重啟

當(dāng)我們需要做線上變更時,有時候需要對集群對多個節(jié)點做滾動重啟,如果到命令行去操作,那效率將變得很低,而且需要人工去干預(yù),浪費人力。這個時候我們就需要把這種重復(fù)性的工作進行平臺化,提升我們的操作效率。

集群管理

集群管理主要是把原來在命令行的一系列操作做到平臺上,用戶和管理員不再需要黑屏操作Kafka集群;這樣做主要有以下優(yōu)點:

提升操作效率;

操作出錯概率更小,集群更安全;

所有操作有跡可循,可以追溯;

集群管理主要包括:broker管理、topic管理、生產(chǎn)/消費權(quán)限管理、用戶管理等

1.13.1 mock功能

在平臺上為用戶的topic提供生產(chǎn)樣例數(shù)據(jù)與消費抽樣的功能,用戶可以不用自己寫代碼也可以測試topic是否可以使用,權(quán)限是否正常;

在平臺上為用戶的topic提供生產(chǎn)/消費權(quán)限驗證功能,讓用戶可以明確自己的賬號對某個topic有沒有讀寫權(quán)限;

1.13.2 權(quán)限管理

把用戶讀/寫權(quán)限管理等相關(guān)操作進行平臺化。

1.13.3 擴容/縮容

把broker節(jié)點上下線做到平臺上,所有的上線和下線節(jié)點不再需要操作命令行。

1.13.4 集群治理

1)無流量topic的治理,對集群中無流量topic進行清理,減少過多無用元數(shù)據(jù)對集群造成的壓力;

2)topic分區(qū)數(shù)據(jù)大小治理,把topic分區(qū)數(shù)據(jù)量過大的topic(如單分區(qū)數(shù)據(jù)量超過100GB/天)進行梳理,看看是否需要擴容,避免數(shù)據(jù)集中在集群部分節(jié)點上;

3)topic分區(qū)數(shù)據(jù)傾斜治理,避免客戶端在生產(chǎn)消息的時候,指定消息的key,但是key過于集中,消息只集中分布在部分分區(qū),導(dǎo)致數(shù)據(jù)傾斜;

4)topic分區(qū)分散性治理,讓topic分區(qū)分布在集群盡可能多的broker上,這樣可以避免因topic流量突增,流量只集中到少數(shù)節(jié)點上的風(fēng)險,也可以避免某個broker異常對topic影響非常大;

5)topic分區(qū)消費延時治理;一般有延時消費較多的時候有兩種情況,一種是集群性能下降,另外一種是業(yè)務(wù)方的消費并發(fā)度不夠,如果是消費者并發(fā)不夠的化應(yīng)該與業(yè)務(wù)聯(lián)系增加消費并發(fā)。

1.13.5 監(jiān)控告警

1)把所有指標(biāo)采集做成平臺可配置,提供統(tǒng)一的指標(biāo)采集和指標(biāo)展示及告警平臺,實現(xiàn)一體化監(jiān)控;

2)把上下游業(yè)務(wù)進行關(guān)聯(lián),做成全鏈路監(jiān)控;

3)用戶可以配置topic或者分區(qū)流量延時、突變等監(jiān)控告警;

1.13.6 業(yè)務(wù)大屏

業(yè)務(wù)大屏主要指標(biāo):集群個數(shù)、節(jié)點個數(shù)、日入流量大小、日入流量記錄、日出流量大小、日出流量記錄、每秒入流量大小、每秒入流量記錄、每秒出流量大小、每秒出流量記錄、用戶個數(shù)、生產(chǎn)延時、消費延時、數(shù)據(jù)可靠性、服務(wù)可用性、數(shù)據(jù)存儲大小、資源組個數(shù)、topic個數(shù)、分區(qū)個數(shù)、副本個數(shù)、消費組個數(shù)等指標(biāo)。

1.13.7 流量限制

把用戶流量現(xiàn)在做到平臺,在平臺進行智能限流處理。

1.13.8 負載均衡

把自動負載均衡功能做到平臺,通過平臺進行調(diào)度和管理。

1.13.9 資源預(yù)算

當(dāng)集群達到一定規(guī)模,流量不斷增長,那么集群擴容機器從哪里來呢?業(yè)務(wù)的資源預(yù)算,讓集群里面的多個業(yè)務(wù)根據(jù)自己在集群中當(dāng)流量去分攤整個集群的硬件成本;當(dāng)然,獨立集群與獨立隔離的資源組,預(yù)算方式可以單獨計算。

1.14.性能評估

1.14.1 單broker性能評估

我們做單broker性能評估的目的包括以下幾方面:

1)為我們進行資源申請評估提供依據(jù);

2)讓我們更了解集群的讀寫能力及瓶頸在哪里,針對瓶頸進行優(yōu)化;

3)為我們限流閾值設(shè)置提供依據(jù);

4)為我們評估什么時候應(yīng)該擴容提供依據(jù);

1.14.2 topic分區(qū)性能評估

1)為我們創(chuàng)建topic時,評估應(yīng)該指定多少分區(qū)合理提供依據(jù);

2)為我們topic的分區(qū)擴容評估提供依據(jù);

1.14.3 單磁盤性能評估

1)為我們了解磁盤的真正讀寫能力,為我們選擇更合適Kafka的磁盤類型提供依據(jù);

2)為我們做磁盤流量告警閾值設(shè)置提供依據(jù);

1.14.4 集群規(guī)模限制摸底

1)我們需要了解單個集群規(guī)模的上限或者是元數(shù)據(jù)規(guī)模的上限,探索相關(guān)信息對集群性能和穩(wěn)定性的影響;

2)根據(jù)摸底情況,評估集群節(jié)點規(guī)模的合理范圍,及時預(yù)測風(fēng)險,進行超大集群的拆分等工作;

1.15 DNS+LVS的網(wǎng)絡(luò)架構(gòu)

當(dāng)我們的集群節(jié)點達到一定規(guī)模,比如單集群數(shù)百個broker節(jié)點,那么此時我們生產(chǎn)消費客戶端指定bootstrap.servers配置時,如果指定呢?是隨便選擇其中幾個broker配置還是全部都配上呢?

其實以上做法都不合適,如果只配置幾個IP,當(dāng)我們配置當(dāng)幾個broker節(jié)點下線后,我們當(dāng)應(yīng)用將無法連接到Kafka集群;如果配置所有IP,那更不現(xiàn)實啦,幾百個IP,那么我們應(yīng)該怎么做呢?

**方案:**采用DNS+LVS網(wǎng)絡(luò)架構(gòu),最終生產(chǎn)者和消費者客戶端只需要配置域名就可以啦。需要注意的是,有新節(jié)點加入集群時,需要添加映射;有節(jié)點下線時,需要從映射中踢掉,否則這批機器如果拿到其他的地方去使用,如果端口和Kafka的一樣的話,原來集群部分請求將發(fā)送到這個已經(jīng)下線的服務(wù)器上來,造成生產(chǎn)環(huán)境重點故障。

二、開源版本功能缺陷

RTMP協(xié)議主要的特點有:多路復(fù)用,分包和應(yīng)用層協(xié)議。以下將對這些特點進行詳細的描述。

2.1 副本遷移

無法實現(xiàn)增量遷移;【我們已經(jīng)基于2.1.1版本源碼改造,實現(xiàn)了增量遷移】

無法實現(xiàn)并發(fā)遷移;【開源版本直到2.6.0才實現(xiàn)了并發(fā)遷移】

無法實現(xiàn)終止遷移;【我們已經(jīng)基于2.1.1版本源碼改造,實現(xiàn)了終止副本遷移】【開源版本直到2.6.0才實現(xiàn)了暫停遷移,和終止遷移有些不一樣,不會回滾元數(shù)據(jù)】

當(dāng)指定遷移數(shù)據(jù)目錄時,遷移過程中,如果把topic保留時間改短,topic保留時間針對正在遷移topic分區(qū)不生效,topic分區(qū)過期數(shù)據(jù)無法刪除;【開源版本bug,目前還沒有修復(fù)】

當(dāng)指定遷移數(shù)據(jù)目錄時,當(dāng)遷移計劃為以下場景時,整個遷移任務(wù)無法完成遷移,一直處于卡死狀態(tài);【開源版本bug,目前還沒有修復(fù)】

遷移過程中,如果有重啟broker節(jié)點,那個broker節(jié)點上的所有l(wèi)eader分區(qū)無法切換回來,導(dǎo)致節(jié)點流量全部轉(zhuǎn)移到其他節(jié)點,直到所有副本被遷移完畢后leader才會切換回來;【開源版本bug,目前還沒有修復(fù)】。

在原生的Kafka版本中存在以下指定數(shù)據(jù)目錄場景無法遷移完畢的情況,此版本我們也不決定修復(fù)次bug:
 
1.針對同一個topic分區(qū),如果部分目標(biāo)副本相比原副本是所屬broker發(fā)生變化,部分目標(biāo)副本相比原副本是broker內(nèi)部所屬數(shù)據(jù)目錄發(fā)生變化;
那么副本所屬broker發(fā)生變化的那個目標(biāo)副本可以正常遷移完畢,目標(biāo)副本是在broker內(nèi)部數(shù)據(jù)目錄發(fā)生變化的無法正常完成遷移;
但是舊副本依然可以正常提供生產(chǎn)、消費服務(wù),并且不影響下一次遷移任務(wù)的提交,下一次遷移任務(wù)只需要把此topic分區(qū)的副本列表所屬broker列表變更后提交依然可以正常完成遷移,并且可以清理掉之前未完成的目標(biāo)副本;
 
這里假設(shè)topic yyj1的初始化副本分布情況如下:
 
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000003,1000001],"log_dirs":["/kfk211data/data31","/kfk211data/data13"]}
]
}
//遷移場景1:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000003,1000002],"log_dirs":["/kfk211data/data32","/kfk211data/data23"]}
]
}
 
//遷移場景2:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000002,1000001],"log_dirs":["/kfk211data/data22","/kfk211data/data13"]}
]
}
針對上述的topic yyj1的分布分布情況,此時如果我們的遷移計劃為“遷移場景1”或遷移場景2“,那么都將出現(xiàn)有副本無法遷移完畢的情況。
但是這并不影響舊副本處理生產(chǎn)、消費請求,并且我們可以正常提交其他的遷移任務(wù)。
為了清理舊的未遷移完成的副本,我們只需要修改一次遷移計劃【新的目標(biāo)副本列表和當(dāng)前分區(qū)已分配副本列表完全不同即可】,再次提交遷移即可。
 
這里,我們依然以上述的例子做遷移計劃修改如下:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000004,1000005],"log_dirs":["/kfk211data/data42","/kfk211data/data53"]}
]
}
這樣我們就可以正常完成遷移。

2.2 流量協(xié)議

限流粒度較粗,不夠靈活精準(zhǔn),不夠智能。

當(dāng)前限流維度組合

/config/users/<user>/clients/<client-id>
/config/users/<user>/clients/<default>
/config/users/<user>
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>

存在問題

當(dāng)同一個broker上有多個用戶同時進行大量的生產(chǎn)和消費時,想要讓broker可以正常運行,那必須在做限流時讓所有的用戶流量閾值之和不超過broker的吞吐上限;如果超過broker上限,那么broker就存在被打掛的風(fēng)險;然而,即使用戶流量沒有達到broker的流量上限,但是,如果所有用戶流量集中到了某幾塊盤上,超過了磁盤的讀寫負載,也會導(dǎo)致所有生產(chǎn)、消費請求將被阻塞,broker可能處于假死狀態(tài)。

解決方案

(1)改造源碼,實現(xiàn)單個broker流量上限限制,只要流量達到broker上限立即進行限流處理,所有往這個broker寫的用戶都可以被限制?。换蛘邔τ脩暨M行優(yōu)先級處理,放過高優(yōu)先級的,限制低優(yōu)先級的;

(2)改造源碼,實現(xiàn)broker上單塊磁盤流量上限限制(很多時候都是流量集中到某幾塊磁盤上,導(dǎo)致沒有達到broker流量上限卻超過了單磁盤讀寫能力上限),只要磁盤流量達到上限,立即進行限流處理,所有往這個磁盤寫的用戶都可以被限制??;或者對用戶進行優(yōu)先級處理,放過高優(yōu)先級的,限制低優(yōu)先級的;

(3)改造源碼,實現(xiàn)topic維度限流以及對topic分區(qū)的禁寫功能;

(4)改造源碼,實現(xiàn)用戶、broker、磁盤、topic等維度組合精準(zhǔn)限流;

到此,關(guān)于“Kafka怎么用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

新聞標(biāo)題:Kafka怎么用
分享路徑:http://muchs.cn/article42/ghcjec.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供自適應(yīng)網(wǎng)站、品牌網(wǎng)站建設(shè)網(wǎng)頁設(shè)計公司、手機網(wǎng)站建設(shè)、App設(shè)計、軟件開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站建設(shè)