【SparkStreaming-創(chuàng)新互聯(lián)

文章目錄
  • 1.轉(zhuǎn)換算子:
    • 案例需求:
  • sparkstreaming + kafka 整合 :
    • 版本選擇:
    • 2.spark整合kafka api:
    • 查看kafka topic命令:
    • sparkstreaming里面: 開發(fā)模式:***
    • 3.提交offset信息
    • kafka消費語義:
    • 存儲offset:

創(chuàng)新互聯(lián)一直在為企業(yè)提供服務,多年的磨煉,使我們在創(chuàng)意設(shè)計,成都全網(wǎng)營銷到技術(shù)研發(fā)擁有了開發(fā)經(jīng)驗。我們擅長傾聽企業(yè)需求,挖掘用戶對產(chǎn)品需求服務價值,為企業(yè)制作有用的創(chuàng)意設(shè)計體驗。核心團隊擁有超過十年以上行業(yè)經(jīng)驗,涵蓋創(chuàng)意,策化,開發(fā)等專業(yè)領(lǐng)域,公司涉及領(lǐng)域有基礎(chǔ)互聯(lián)網(wǎng)服務托管服務器、成都app開發(fā)、手機移動建站、網(wǎng)頁設(shè)計、網(wǎng)絡整合營銷。1.轉(zhuǎn)換算子:

transform

DStream 和 rdd之間數(shù)據(jù)進行交互的算子

流處理 數(shù)據(jù)源:
	一個數(shù)據(jù)來自于 mysql數(shù)據(jù)/hdfs上文本數(shù)據(jù)  【量小】  從表/維表 
	一個數(shù)據(jù) 來自于 kafka sss 讀取形成 DStream數(shù)據(jù) 【量大】 主業(yè)務  =》 主表
案例需求:
彈幕 過濾的功能 /黑名單的功能 
離線:

彈幕: 主表
	不好看
	垃圾
	男主真帥
	女主真好看
	666
過濾的彈幕:維表 
	熱巴真丑
	雞兒真美
	王鶴棣退出娛樂圈

實時:

sparkstreaming + kafka 整合 :

kafka =》 sparkstreaming

版本選擇:

spark 2.x : kafka版本: 0.8 0.10.0 or higher ok
spark 3.x =>kafka : 1.kafka版本: 0.10.0 or higher ok

spark 去kafka讀取數(shù)據(jù)的方式:
1.kafka 0.8 reciver方式讀取kafka數(shù)據(jù) 【效率低 、代碼開發(fā)復雜】
2.kafka 0.10.0版本之后 direct stream的方式加載kafka數(shù)據(jù) 【效率高、代碼開發(fā)簡單】
kafka:
版本也有要求: 0.11.0 版本之后

交付語義: consumer producer
producer 默認就是精準一次
consumer 交付語義取決于 consumer 框架本身

交付語義: consumer

? 至多一次 數(shù)據(jù)丟失問題
? 至少一次 數(shù)據(jù)不會丟失,數(shù)據(jù)重復消費
? 精準一次 數(shù)據(jù)不會丟失 數(shù)據(jù)也不會重復消費

spark 整合kafka 版本 0.10.0版本之后:
1.kafka 0.11.0之后 2.2.1 =>direct stream
2.sparkstreaming 默認消費kafka數(shù)據(jù) 交付語義:
至少一次

  1. spark消費kafka, DStream 【rdd 分區(qū)數(shù)】 =》 kafka topic 分區(qū)數(shù) 是一一對應的
    1:1 correspondence between Kafka partitions and Spark partitions,
2.spark整合kafka api:

? 1.simple API =》 過時不用了

  1. new Kafka consumer API 整合 kafka 主流
    3.引入依賴: org.apache.spark spark-streaming-kafka-0-10_2.12 3.2.1

?。?!不需要引入 kafka-clients 依賴

查看kafka topic命令:

kafka-topics.sh --list
–zookeeper bigdata32:2181,bigdata33:2181,bigdata34:2181/kafka

kafka-topics.sh --create
–zookeeper bigdata32:2181,bigdata33:2181,bigdata34:2181/kafka
–topic spark-kafka01 --partitions 3 --replication-factor 1

producer:
kafka-console-producer.sh
–broker-list bigdata33:9092,bigdata34:9092
–topic spark-kafka01

consumer:
kafka-console-consumer.sh
–bootstrap-server bigdata33:9092,bigdata34:9092
–topic spark-kafka
–from-beginning

val kafkaParams = Map[String, Object](
“bootstrap.servers” ->“bigdata33:9092,bigdata34:9092”,
“key.deserializer” ->classOf[StringDeserializer],
“value.deserializer” ->classOf[StringDeserializer],
“group.id” ->“dl2262_01”,
“auto.offset.reset” ->“l(fā)atest”,
“enable.auto.commit” ->(false: java.lang.Boolean)
)

需求:
消費kafka數(shù)據(jù) wc 將 結(jié)果寫到 mysql里面

input
todo
output

kafka =>spark =>mysql 鏈路打通了

模擬:spark作業(yè)掛掉 =》 重啟

“消費完kafka的數(shù)據(jù) 程序重啟之后接著從上次消費的位置接著消費 ”

目前: code不能滿足
1.目前代碼 這兩個參數(shù) 不能動
“auto.offset.reset” ->“earliest”
“enable.auto.commit” ->(false: java.lang.Boolean)

2.主要原因 : spark作業(yè) 消費kafka數(shù)據(jù):
1.獲取kafka offset =》 處理kafka數(shù)據(jù) =》 “提交offset的操作” 沒有
解決:
1.獲取kafka offset // todo
2. 處理kafka數(shù)據(jù)
3.提交offset的操作 // todo

1.獲取kafka offset // todo
1. kafka offset 信息
2.spark rdd分區(qū)數(shù) 和 kafka topic 的分區(qū)數(shù) 是不是 一對一

報錯:
org.apache.spark.rdd.ShuffledRDD cannot be cast to org.apache.spark.streaming.kafka010.HasOffsetRanges

ShuffledRDD =》 HasOffsetRanges 說明 代碼有問題

sparkstreaming里面: 開發(fā)模式:***

? 1.獲取kafka 流數(shù)據(jù)
? 2. 流 Dstream =》 調(diào)用foreachRDD算子 進行輸出:
? 0.獲取offset 信息
? 1.做業(yè)務邏輯
? 2.結(jié)果數(shù)據(jù)輸出
? 3.提交offset信息

offset解釋:

01 batch:

rdd的分區(qū)數(shù):3
topic partition fromOffset untilOffset
spark-kafka01 0 0 1
spark-kafka01 1 0 1
spark-kafka01 2 0 0

02 batch:
rdd的分區(qū)數(shù):3
topic partition fromOffset untilOffset
spark-kafka01 0 1 1
spark-kafka01 1 1 1
spark-kafka01 2 0 0

此時 kafka 里面數(shù)據(jù)已經(jīng)消費完了 fromOffset=untilOffset

3.提交offset信息

2.存儲offset信息
spark流式處理 默認消費語義 : 至少一次
精準一次:
1.output + offset 同時完成
1.生產(chǎn)上Checkpoints不能用
2.Kafka itself =》至少一次
推薦使用 =》 簡單 高效
90% 都可以解決 10% 精準一次
3.Your own data store: =》 開發(fā)大量代碼 =》
mysql、redis、hbase、
至少一次
精準一次
mysql:
獲取offset
todo
output
提交offset

spark作業(yè)掛了 =》 啟動spark作業(yè) :
1.從mysql里面獲取offset
todo
output
提交offset

kafka消費語義:

? 1.至多一次 【丟數(shù)據(jù)】
? 2.至少一次 【不會丟數(shù)據(jù) 可能會重復消費數(shù)據(jù)】
? 3.精準一次 【不丟、不重復消費】

offset信息提交 :
1.spark todo :
至少一次:
1 2 3 4
offset get
業(yè)務邏輯 output db
提交offset

精準一次:output + 提交offset 一起發(fā)生 =》 事務來實現(xiàn)
事務: 一次操作要么成功 要么失敗

topic partition fromOffset untilOffset
spark-kafka01 0 3 3
spark-kafka01 2 2 2
spark-kafka01 1 2 2

存儲offset:

? kafka 本身:
? offset 信息存儲在哪?

kafka 某個topic下:
__consumer_offsets =》 spark作業(yè) 消費kafka的offset信息

topic offset 信息存儲的地方

你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

網(wǎng)站標題:【SparkStreaming-創(chuàng)新互聯(lián)
網(wǎng)頁地址:http://www.muchs.cn/article32/dpgepc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供服務器托管、App開發(fā)、品牌網(wǎng)站設(shè)計、自適應網(wǎng)站、網(wǎng)站設(shè)計公司、微信小程序

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作