當(dāng)一個(gè)程序向另一個(gè)程序發(fā)送消息的時(shí)候,可以一對一直接發(fā)送
當(dāng)出現(xiàn)越來越多的程序和通訊鏈路,可以發(fā)現(xiàn)這種模式的缺點(diǎn)
1-團(tuán)隊(duì)之間可能進(jìn)行著重復(fù)的工作,會(huì)造成資源的浪費(fèi)
2-當(dāng)信息過多無法實(shí)現(xiàn)及時(shí)的同步時(shí),會(huì)造成信息的丟失
3-各個(gè)程序之間直接通訊,耦合度太高了,可能會(huì)牽一發(fā)而動(dòng)全身
kafka可以接收不同生產(chǎn)者的消息,然后由不同的消費(fèi)者來訂閱這些消息供自己使用
那么消費(fèi)者怎么能保證自己拿到想要的消息呢?這里就要用到Topic主題
分區(qū)概念,一個(gè)主題里可能包含多個(gè)分區(qū),分區(qū)可以分布在不同的服務(wù)器上,這樣一個(gè)主題也就可以分布在多個(gè)服務(wù)器上了,
生產(chǎn)者會(huì)把消息放入對應(yīng)主題的對應(yīng)分區(qū)中,生產(chǎn)者怎么知道哪個(gè)消息該放入哪個(gè)分區(qū)呢?分兩種情況:
(1)生產(chǎn)者指定了分區(qū)
(2)生產(chǎn)者沒有指定分區(qū)
分區(qū)器會(huì)根據(jù)鍵key來決定消息的去處
鍵可以看做是一個(gè)標(biāo)記,每個(gè)值都會(huì)對應(yīng)一個(gè)標(biāo)記
分區(qū)器可以看做是一個(gè)算法,輸入值是鍵,輸出值是該去哪個(gè)分區(qū)
那么一個(gè)消息要包含一下這些部分:主題、分區(qū)、鍵、值。這樣一個(gè)消息才能找到自己對應(yīng)要去的去處
那么接下來消費(fèi)者要怎么讀取數(shù)據(jù)呢?
引入一個(gè)概念:偏移量(也就是第幾個(gè)的意思)
偏移量在寫入的時(shí)候就已經(jīng)定好了,消費(fèi)者在讀取數(shù)據(jù)的時(shí)候都是根據(jù)偏移量來讀取數(shù)據(jù)的
1-偏移量(offset):第幾個(gè)
2-一個(gè)分區(qū)里,每個(gè)消息的偏移量是唯一的
3-消費(fèi)者只能順序讀取
上面一個(gè)獨(dú)立的kafka服務(wù)器也就是一個(gè)broker
一個(gè)broker里可以有多個(gè)主題,一個(gè)主題里又可以有多個(gè)分區(qū)
broker接收來自生產(chǎn)者的消息,為每個(gè)消息設(shè)置相應(yīng)的偏移量,然后把消息保存到磁盤里
broker響應(yīng)消費(fèi)者的請求
多個(gè)broker就會(huì)組成kafka的集群,保證項(xiàng)目的安全,一個(gè)宕機(jī)另一個(gè)可以補(bǔ)上
如果是海量數(shù)據(jù)的話,用單獨(dú)一臺服務(wù)器存儲的話,壓力太大??梢园岩粋€(gè)主題切割成幾塊來處理,在好幾臺服務(wù)器上搭建集群,每個(gè)服務(wù)器就是一個(gè)broker。假如生產(chǎn)者需要往主題A中生產(chǎn)100T數(shù)據(jù),那么就在在3臺服務(wù)器上的主題A各自生產(chǎn)存放33T左右,同一個(gè)主題在3臺服務(wù)器上分成三個(gè)不同的分區(qū)
消費(fèi)者引入消費(fèi)組,消費(fèi)組中包含多個(gè)消費(fèi)者Consumer,每個(gè)消費(fèi)者消費(fèi)一個(gè)分區(qū)的消息,就可以提高消費(fèi)者消費(fèi)一個(gè)主題消息的能力
其中有一些規(guī)則
(1)一個(gè)分區(qū)的消息數(shù)據(jù)只能由一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi),如果一個(gè)分區(qū)由兩個(gè)消費(fèi)者消費(fèi),那么就會(huì)不確定哪個(gè)消費(fèi)者,可能會(huì)重復(fù)消費(fèi),不利于管理
如果一個(gè)分區(qū)掛了呢,那么存在這個(gè)分區(qū)的33T數(shù)據(jù)就沒了,所以kafka引入了副本來備份數(shù)據(jù)
1-這些副本是不一樣的,分為leader和follower,消費(fèi)者消費(fèi)的時(shí)候只消費(fèi)leader,而follower只負(fù)責(zé)備份的作用,等leader掛了follower才有條件成為新的leader
還有一些信息是存在zookeeper里的,存儲的信息就是哪些kafka服務(wù)器broker上線了正在工作,還會(huì)記錄每個(gè)broker中哪個(gè)是leader分區(qū)
一種是基于zookeeper,一種是不基于
某一個(gè)分區(qū)當(dāng)中的數(shù)據(jù)只能交給一個(gè)消費(fèi)者消費(fèi),防止兩者沖突,
為了保證分區(qū)的可靠性,引入了副本,副本分為leader和follower,生產(chǎn)消費(fèi)只針對leader,等leader掛了follower才有條件成為新的leader
zookeeper存儲上下限信息
教學(xué)路徑:
查看本地JDK安裝是否成功
到官網(wǎng)下載
從mac傳輸安裝包到Linux系統(tǒng)
scp -P 22 /Library/Java/kafka_2.12-3.1.0.tgz root@192.168.19.11:/
root
解壓文件
tar -zxvf kafka_2.13-3.2.0.tgz
(3)修改kafka的配置文件:config/server.properties# 現(xiàn)在不用改,但是后面如果要配置kafka集群的話,要保證每個(gè)服務(wù)器的broker.id都是不一樣的
broker.id=0
# 配置當(dāng)前主機(jī)的地址,默認(rèn)端口號就是9092
listeners=PLAINTEXT://192.168.19.11:9092
advertised.listeners=PLAINTEXT://192.168.19.11:9092
# 配置日志文件的路徑
log.dirs=/root/kafka_2.12-3.1.0/data/kafka-logs
# 連接zookeeper
zookeeper.connect=127.0.0.1:2181
可使用外置或內(nèi)置Zookeeper,這里使用內(nèi)置Zookeeper
(4)修改zookeeper的配置文件:config/zookeeper.propertiesdataDir=../zkData
dataLogDir=../zkLogs
audit.enable=true
(5)啟動(dòng)kafka進(jìn)入bin目錄
先啟動(dòng)zookeeper
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
然后啟動(dòng)kafka
./kafka-server-start.sh -daemon ../config/server.properties &
查看是否啟動(dòng)成功
ps -aux | grep server.properties
ps -aux | grep zookeper.properties
【2】測試實(shí)例
(1)概念介紹(1)概念
(2)為什么要使用消息隊(duì)列
如果使用的是同步的通信方式來解決多個(gè)服務(wù)之間的通信,則要保證每一步的通信都要暢通,否則就會(huì)出錯(cuò)
而如果使用異步的通信方式來解決多個(gè)服務(wù)之間的通信,就可以實(shí)現(xiàn)解耦
創(chuàng)建一個(gè)名為javaTopic的主題,分區(qū)為2個(gè),副本為1個(gè)
./kafka-topics.sh --bootstrap-server 192.168.19.11:9092 --create --topic javaTopic --partitions 2 --replication-factor 1
replication-factor:指定副本數(shù)量
partitions:指定分區(qū)
./kafka-topics.sh --bootstrap-server 192.168.19.11:9092 --list
(4)刪除Topic./kafka-topics.sh --bootstrap-server 192.168.19.11:9092 --delete -topic testTopic
(5)生產(chǎn)/消費(fèi)數(shù)據(jù)進(jìn)入bin目錄,打開兩個(gè)終端,分別執(zhí)行如下命令:
(1)先執(zhí)行生產(chǎn)者命令(發(fā)送消息)
kafka自帶了一個(gè)producer命令客戶端,可以從本地文件中斷讀取內(nèi)容,或者我們也可以用命令行直接輸入內(nèi)容,并將這些內(nèi)容以消息的形式發(fā)送到kafka集群中。在默認(rèn)情況下,每一個(gè)行會(huì)被當(dāng)成一個(gè)獨(dú)立的消息。使用kafka的發(fā)送消息的客戶端,制定發(fā)送到的kafka服務(wù)器地址和topic
./kafka-console-producer.sh --broker-list 192.168.19.11:9092 --topic javaTopic
(2)輸入消息
(3)另一個(gè)窗口進(jìn)入bin目錄執(zhí)行消費(fèi)者命令
./kafka-console-consumer.sh --bootstrap-server 192.168.19.11:9092 --topic javaTopic
(4)繼續(xù)在生產(chǎn)者窗口輸入內(nèi)容
(5)再看消費(fèi)者窗口
成功
上面我們測試的時(shí)候,先在生產(chǎn)者輸入到3333,然后才開啟了消費(fèi)者。接著輸入4444才被消費(fèi)者消費(fèi)到,為什么4444前面的內(nèi)容沒有被消費(fèi)到呢?
這就引入了“偏移量”
對于consumer,kafka同樣也攜帶了了一個(gè)命令行客戶端,會(huì)將獲取到內(nèi)容在命令中進(jìn)行輸出,默認(rèn)是消費(fèi)最新的消息。使用kafka的消費(fèi)者消息的客戶端,從指定kafka服務(wù)器的指定topic中消費(fèi)信息
(1)方式一:從最后一條消息的偏移量+1開始消費(fèi)
./kafka-console-consumer.sh --bootstrap-server 192.168.19.11:9092 --topic javaTopic
(2)方式二:從頭開始消費(fèi)
./kafka-console-consumer.sh --bootstrap-server 192.168.19.11:9092 --from-beginning --topic javaTopic
幾個(gè)注意點(diǎn):
(1)消息會(huì)被存儲,生產(chǎn)找把消息發(fā)送給broker,broker會(huì)把消息保存到本地的日志文件中
(2)消息是順序存儲的,通過offset偏移量來描述消息的有序性
(3)消息是有偏移量的
(4)消費(fèi)時(shí)可以指明偏移量進(jìn)行消費(fèi)
進(jìn)入我們在server.peoperties配置文件中設(shè)置的日志路徑:kafka_2.12-3.1.0/data/kafka-logs??梢钥吹较旅嬗?0個(gè)對應(yīng)的維護(hù)偏移量的文件:
(1)__consumer_offsets-49
1-kafka內(nèi)部自己創(chuàng)建了__consumer_offsets主題包含了50個(gè)分區(qū)。這個(gè)主題用來存放消費(fèi)者消費(fèi)某個(gè)主題的偏移量,也就是記錄一個(gè)消費(fèi)者已經(jīng)消費(fèi)到哪個(gè)位置了。
2-會(huì)有很多個(gè)主題,會(huì)有很多個(gè)消費(fèi)者,每個(gè)消費(fèi)者都會(huì)往這個(gè)文件里放入offset數(shù)據(jù),所以這個(gè)文件存的信息非常多,創(chuàng)建了50個(gè)分區(qū)就可以保證存儲很大的數(shù)據(jù)量。通過設(shè)置了50個(gè)分區(qū),可以提升這個(gè)主題的并發(fā)性。
3-因?yàn)槊總€(gè)消費(fèi)者都會(huì)自己維護(hù)著消費(fèi)的主題的偏移量,也就是說每個(gè)消費(fèi)者會(huì)把消費(fèi)的主題的偏移量自主上報(bào)給kafka中的默認(rèn)主題:consumer_offsets。
(2)信息補(bǔ)充
1-定期把自己消費(fèi)分區(qū)的offset提交給kafka內(nèi)部:__consumer_offsets,提交過去的時(shí)候,key是consumerGroupid+topic+分區(qū)號,value就是當(dāng)前offset的值,kafka會(huì)定期清理topic里的消息,最后就保留最新的那條數(shù)據(jù)。
2-因?yàn)開_consumer_offsets可能會(huì)接收高并發(fā)的請求,kafka默認(rèn)給其分配50個(gè)分區(qū)(可以通過offsets.topic.num.partitions設(shè)置),這樣可以通過加機(jī)器的方式抗大并發(fā)
3-通過如下公式可以選出consumer消費(fèi)的offset要提交到__consumer_offsets的哪個(gè)分區(qū):hash(consumerGroupId)%__consumer_offsets主題的分區(qū)數(shù)(常見的哈希算法+取模)
還可以看到有:javaTopic-0、javaTopic-1有我們創(chuàng)建的兩個(gè)分區(qū),進(jìn)入分區(qū)文件查看:
(1)0000.index
稀疏索引的原理,使用二分查找法,能更快速的找到數(shù)據(jù)
(2)0000.log
保存的就是生產(chǎn)者往topic的分區(qū)里發(fā)送的消息
(3)0000.timeindex
根據(jù)時(shí)間索引來查找數(shù)據(jù)
總結(jié)流程就是:生產(chǎn)者輸入的消息存入javaTopic-0目錄下的0000.log文件中,消息有自己的偏移量,消費(fèi)者在獲取消息的時(shí)候根據(jù)偏移量從這個(gè)文件中讀取信息,而且只是讀取并不會(huì)刪除消息,所以其他的消費(fèi)者再來讀取的時(shí)候依然可以讀取到全部的消息。
在一個(gè)kafka的topic中,啟動(dòng)兩個(gè)消費(fèi)者,一個(gè)生產(chǎn)者,問:生產(chǎn)找發(fā)送消息,這條消息是否同時(shí)會(huì)被兩個(gè)消費(fèi)者消費(fèi)??
(1)開啟生產(chǎn)者和一個(gè)消費(fèi)者組里的兩個(gè)消費(fèi)者
生產(chǎn)者
./kafka-console-producer.sh --broker-list 192.168.19.11:9092 --topic javaTopic
消費(fèi)者組
./kafka-console-consumer.sh --bootstrap-server 192.168.19.11:9092 --consumer-property group.id=javaGroup --topic javaTopic
生產(chǎn)者發(fā)送了消息
消費(fèi)者1收到消息了
消費(fèi)者2沒有收到消息
接續(xù)輸入消息
看到兩個(gè)消費(fèi)者輪流獲取消息。這里是因?yàn)槲抑皠?chuàng)建Topic的時(shí)候指定了2個(gè)分區(qū),所以生產(chǎn)的消息被輪流放入到兩個(gè)分區(qū)中去了,而消費(fèi)組中的兩個(gè)消費(fèi)者各自獲取一個(gè)分區(qū)中的消息,所以看起來就是輪流消費(fèi)了。那么如果是只有一個(gè)分區(qū)呢?
如果只有一個(gè)分區(qū)的話,只有一個(gè)消費(fèi)者可以消費(fèi)這個(gè)分區(qū)中的消息,這樣可以保證消費(fèi)的有序性,并且是只有最新加入消費(fèi)組的哪個(gè)消費(fèi)者可以消費(fèi)。
總結(jié):如果多個(gè)消費(fèi)者在同一個(gè)消費(fèi)組,那么只有一個(gè)消費(fèi)者可以收到訂閱的topic中的消息。換言之,同一個(gè)消費(fèi)組中只能有一個(gè)消費(fèi)者收到一個(gè)topic中的消息。
【3】多播消息開啟生產(chǎn)者和兩個(gè)消費(fèi)者組里的兩個(gè)消費(fèi)者
./kafka-console-consumer.sh --bootstrap-server 192.168.19.11:9092 --consumer-property group.id=javaGroup01 --topic javaTopic
./kafka-console-consumer.sh --bootstrap-server 192.168.19.11:9092 --consumer-property group.id=javaGroup02 --topic javaTopic
可以看到兩個(gè)不同消費(fèi)組的消費(fèi)者都同時(shí)獲取到了生產(chǎn)者發(fā)送的消息
總結(jié):不同消費(fèi)組訂閱同一個(gè)topic,那么不同額消費(fèi)組中只有一個(gè)消費(fèi)者能收到消息。實(shí)際上也是多個(gè)消費(fèi)組中的多個(gè)消費(fèi)者收到了同一個(gè)消息。
【4】查看消費(fèi)組和信息# 查看當(dāng)前主題下有哪些消費(fèi)者組
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.11:9092 --list
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.11:9092 --describe --group javaGroup
此時(shí)關(guān)閉所有消費(fèi)者,然后生產(chǎn)者接著發(fā)送消息,再看一下消費(fèi)組信息,就能看到LAG量在增加
【5】主題和分區(qū)的概念 (1)主題Topic主題Topic在kafka中是一個(gè)邏輯的概念,kafka通過topic把消息進(jìn)行分類。不同的topic會(huì)被訂閱該topic的消費(fèi)者消費(fèi)。
但是有一個(gè)問題,如果說這個(gè)topic的消息非常非常多,多到需要幾個(gè)T來存,因?yàn)橄⑹菚?huì)被保存到log日志文件中的。為了解決這個(gè)文件過大的問題,kafka提出了partition分區(qū)的概念
(2)分區(qū)Partition通過partition把一個(gè)topic中額消息分區(qū)來存儲,這樣的好處有多個(gè):
1-分區(qū)存儲可以解決統(tǒng)一存儲文件過大的問題
2-提供了讀寫的吞吐量:讀和寫可以同時(shí)在多個(gè)分區(qū)中進(jìn)行
# 這里就創(chuàng)建了兩個(gè)javaTopic分區(qū)
./kafka-topics.sh --bootstrap-server 192.168.19.11:9092 --create --topic javaTopic --partitions 2 --replication-factor 1
# 查看當(dāng)前主題的信息
./kafka-topics.sh --bootstrap-server 192.168.19.11:9092 -topic testTopic
(四)搭建kafka集群(3個(gè)broker)
【1】搭建過程進(jìn)入config目錄下,拷貝兩份server.properties
cp server.properties server01.properties
cp server.properties server02.properties
主要修改的內(nèi)容如下:
(1)server.properties
broker.id=0
listeners=PLAINTEXT://192.168.19.11:9092
advertised.listeners=PLAINTEXT://192.168.19.11:9092
log.dirs=/root/kafka_2.12-3.1.0/data/kafka-logs
(2)server01.properties
broker.id=1
listeners=PLAINTEXT://192.168.19.11:9093
advertised.listeners=PLAINTEXT://192.168.19.11:9093
log.dirs=/root/kafka_2.12-3.1.0/data/kafka-logs-1
(3)server02.properties
broker.id=2
listeners=PLAINTEXT://192.168.19.11:9094
advertised.listeners=PLAINTEXT://192.168.19.11:9094
log.dirs=/root/kafka_2.12-3.1.0/data/kafka-logs-2
使用如下命令來啟動(dòng)3臺服務(wù)器
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
./kafka-server-start.sh -daemon ../config/server.properties &
./kafka-server-start.sh -daemon ../config/server01.properties &
./kafka-server-start.sh -daemon ../config/server02.properties &
【2】測試過程
(1)副本的概念副本就是對分區(qū)的備份。在集群中,不同的副本會(huì)被部署在不同的broker上,例子:創(chuàng)建1個(gè)主題,2個(gè)分區(qū),3個(gè)副本
./kafka-topics.sh --bootstrap-server 192.168.19.11:9092 --create --topic replicatedTopic --partitions 2 --replication-factor 3
查看主題的詳細(xì)信息
./kafka-topics.sh --bootstrap-server 192.168.19.11:9092 --describe --topic replicatedTopic
從結(jié)果中可以獲取的信息:
(1)replicatedTopic這個(gè)主題有兩個(gè)分區(qū),每個(gè)分區(qū)有3個(gè)副本,分別放在3臺服務(wù)器上
(2)第一個(gè)分區(qū)的partition號為0,副本的leader是1號副本,其他副本為follower
(3)第二個(gè)分區(qū)的partition號為1,副本的leader是0號副本,其他副本為follower
(4)生產(chǎn)者只會(huì)給每個(gè)分區(qū)的leader副本發(fā)送消息,也就是只會(huì)往一臺服務(wù)器上發(fā)送消息,其他兩臺服務(wù)器上的副本同步leader副本的信息,作為備份。消費(fèi)者同理,只會(huì)消費(fèi)leader副本的消息
(5)Isr:可以同步的broker節(jié)點(diǎn)和已同步的broker節(jié)點(diǎn),存放在Isr集合中,表示當(dāng)前可以正常參與同步的所有broker節(jié)點(diǎn)。如果一個(gè)broker上的副本同步的效率特別差的話,這個(gè)broker就會(huì)被cluster從Isr集合中刪除,下次同步的時(shí)候就不給它同步了。如果leader副本掛了的話,就會(huì)從Isr集合中選取一個(gè)follower作為新的leader
總結(jié)就是:1個(gè)主題的數(shù)據(jù)可以拆分放在多個(gè)分區(qū)中,每個(gè)分區(qū)可以創(chuàng)建多個(gè)副本放在不同的broker
./kafka-console-producer.sh --broker-list 192.168.19.11:9092,192.168.19.11:9093,192.168.19.11:9094 --topic replicatedTopic
(3)集群消息的消費(fèi)./kafka-console-consumer.sh --bootstrap-server 192.168.19.11:9092,192.168.19.11:9093,192.168.19.11:9094 --from-beginning --topic replicatedTopic
(4)集群消費(fèi)組的消費(fèi)./kafka-console-consumer.sh --bootstrap-server 192.168.19.11:9092,192.168.19.11:9093,192.168.19.11:9094 --from-beginning --consumer-property group.id=replicatedGroup --topic replicatedTopic
(1)一個(gè)副本里的消息只能被一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi),這樣可以保證消費(fèi)順序,不會(huì)被其他消費(fèi)者打亂。那怎么做到消費(fèi)的總順序性呢?
(2)一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者可以消費(fèi)多個(gè)副本里的消息,如果一個(gè)消費(fèi)者掛掉了,會(huì)觸發(fā)rebalance機(jī)制,其他消費(fèi)者可以頂上來消費(fèi)該分區(qū)
(3)消費(fèi)組中的消費(fèi)者數(shù)量不能比一個(gè)topic中的partition副本數(shù)量多,否則多出來的消費(fèi)者消費(fèi)不到信息
(4)kafka只能在partition范圍內(nèi)保證消息消費(fèi)的局部順序性,不能在同一個(gè)topic中的多個(gè)partition中保證總的消費(fèi)順序性。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧
本文題目:【Kafka】Linux下搭建kafka服務(wù),完整學(xué)習(xí)案例-創(chuàng)新互聯(lián)
URL網(wǎng)址:http://muchs.cn/article18/dhgogp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、微信公眾號、網(wǎng)站收錄、品牌網(wǎng)站制作、網(wǎng)站設(shè)計(jì)公司、軟件開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容