thingsboard源碼分析--從設(shè)備數(shù)據(jù)上報(bào)的角度-創(chuàng)新互聯(lián)

簡介:

本次主要是對thingsboard的源碼進(jìn)行一個(gè)簡單的分析,從設(shè)備發(fā)送遙測數(shù)據(jù)到平臺(tái)的角度跟進(jìn)后端代碼邏輯探究其流轉(zhuǎn)過程,好了,話不多說,直接進(jìn)入正題。

創(chuàng)新互聯(lián)專注于江南網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠為您提供江南營銷型網(wǎng)站建設(shè),江南網(wǎng)站制作、江南網(wǎng)頁設(shè)計(jì)、江南網(wǎng)站官網(wǎng)定制、小程序制作服務(wù),打造江南網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供江南網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。數(shù)據(jù)流轉(zhuǎn)大概流程

找到MqttTransportService類==》通過@PostConstruct注解在項(xiàng)目啟動(dòng)后進(jìn)入init()方法
==》里面綁定了MqttTransportServerInitializer類即mqtt服務(wù)初始化
==》MqttTransportServerInitializer類繼承ChannelInitializer類重寫了initChannel方法
==》initChannel方法里綁定了MqttTransportHandler
==》進(jìn)入MqttTransportHandler的channelRead方法,驗(yàn)證消息類型為mqtt時(shí)轉(zhuǎn)入processMqttMsg方法
==》processMqttMsg里進(jìn)行判斷:消息類型為連接時(shí)轉(zhuǎn)入processConnect,設(shè)備session為臨時(shí)的轉(zhuǎn)入processProvisionSessionMsg
否則轉(zhuǎn)入enqueueRegularSessionMsg方法,這里先探討轉(zhuǎn)入enqueueRegularSessionMsg
==》轉(zhuǎn)入enqueueRegularSessionMsg后調(diào)用processMsgQueue將消息投遞到隊(duì)列
==》跟進(jìn)去發(fā)現(xiàn)里面調(diào)用了processRegularSessionMsg方法
==》processRegularSessionMsg里根據(jù)消息的類型進(jìn)行轉(zhuǎn)發(fā),比如:發(fā)布,訂閱,取消訂閱,取消連接等等
==》跟進(jìn)PUBLISH,轉(zhuǎn)入processPublish方法
==》轉(zhuǎn)入processDevicePublish,進(jìn)入發(fā)現(xiàn)這里根據(jù)消息的主題進(jìn)行轉(zhuǎn)發(fā),這里選擇isDeviceTelemetryTopic對應(yīng)的transportService.process接口實(shí)現(xiàn)
==》發(fā)現(xiàn)這里對消息封裝了一下之后轉(zhuǎn)入sendToRuleEngine,將消息發(fā)送到規(guī)則鏈
》繼續(xù)跟進(jìn)進(jìn)入sendToRuleEngine,發(fā)現(xiàn)調(diào)用ruleEngineMsgProducer.send,即將消息通過生產(chǎn)者發(fā)送到隊(duì)列
這里對應(yīng)多個(gè)實(shí)現(xiàn),例如:inMemory,Kafka,RabbitMQ等等,默認(rèn)發(fā)送到inMemory內(nèi)存
》有生產(chǎn)者那肯定有消費(fèi)者,我們找到DefaultTbRuleEngineConsumerService核心消費(fèi)者
》找到launchMainConsumers方法》launchConsumer
》consumerLoop
》發(fā)現(xiàn)consumerLoop是個(gè)循環(huán),將消息取出來消費(fèi),轉(zhuǎn)submitMessage方法
》然后轉(zhuǎn)入forwardToRuleEngineActor》調(diào)用actorContext.tell,這里開始就是Actor模型流轉(zhuǎn)了,不清楚的可以去百度搜索一下
》首先調(diào)用appActor.tell通過根appActor調(diào)用tell方法轉(zhuǎn)入enqueue方法,里面對消息進(jìn)行了分類,分為高優(yōu)先級和正常消息隊(duì)列
還有initActor()方法創(chuàng)建一系列actor,大概流程:AppActor
》TenantActor
》RuleChainActor
》RuleNodeActor,我們先轉(zhuǎn)入tryProcessQueue方法
==>然后發(fā)現(xiàn)調(diào)用了processMailbox,發(fā)現(xiàn)這里是將之前分類的消息依次取出來然后調(diào)用actor.process(msg)方法依次向下流轉(zhuǎn)處理消息

》ContextAwareActor》process==》doProcess==》…

詳細(xì)流轉(zhuǎn)過程

我們首先找到位于common模塊下transport模塊下的mqtt下的MqttTransportService類
在這里插入圖片描述
在init方法中我們可以看到:其實(shí)現(xiàn)了bossGroup,workerGroup兩個(gè)線程組,綁定端口,綁定Handler相關(guān)的內(nèi)容,這是netty相關(guān)的學(xué)過的應(yīng)該非常熟悉,這里也可以猜想到thingsboard里的mqtt是基于netty封裝實(shí)現(xiàn)的。
項(xiàng)目在啟動(dòng)時(shí),基于spring的@Service注解和 @PostConstruct注解,會(huì)進(jìn)入到我們的init()方法里,然后轉(zhuǎn)入MqttTransportServerInitializer,即mqtt服務(wù)初始化過程。
進(jìn)入到MqttTransportServerInitializer方法
在這里插入圖片描述
這里主要是繼承了ChannelInitializer類重寫了initChannel初始化管道方法,
設(shè)置一些解碼,ip過濾相關(guān)的,然后轉(zhuǎn)入MqttTransportHandler具體的邏輯處理。
轉(zhuǎn)入MqttTransportHandler我們直接看到channelRead方法,客戶端在連接mqtt服務(wù)端時(shí)會(huì)進(jìn)入到這里
在這里插入圖片描述
這里我們可以看到,主要對消息做了一個(gè)判斷,是否是mqtt類型的消息,是則轉(zhuǎn)入processMqttMsg方法處理mqtt消息。
在這里插入圖片描述
在這個(gè)方法里面我們可以看到,消息類型為連接時(shí)轉(zhuǎn)入processConnect方法處理連接,返回相關(guān)ack確認(rèn),設(shè)備session為臨時(shí)的轉(zhuǎn)入processProvisionSessionMsg,否則轉(zhuǎn)入enqueueRegularSessionMsg處理消息,這里我們主要關(guān)注最后一個(gè)。
在這里插入圖片描述
這里可以看到消息進(jìn)入時(shí)做了一個(gè)判斷,消息數(shù)量大于隊(duì)列容納消息長度時(shí)直接return,這個(gè)也可以在application.yml里面配置的。然后直接調(diào)用processMsgQueue方法處理消息到隊(duì)列,我們跟進(jìn)去看看。
在這里插入圖片描述
這里我們可以看到,根據(jù)消息的類型來做后續(xù)的邏輯處理,例如:發(fā)布,訂閱,取消訂閱,心跳等等。這里我們進(jìn)入PUBLISH消息上報(bào)這里。
z
這里有判斷,我們直接轉(zhuǎn)入processDevicePublish方法,上面那個(gè)是和網(wǎng)關(guān)相關(guān)的我們目前沒涉及到,先不管。
在這里插入圖片描述
看到processDevicePublish這個(gè)方法里面有很多個(gè)分支,例如:發(fā)布設(shè)備屬性,設(shè)備遙測數(shù)據(jù),設(shè)備遠(yuǎn)程控制等等,我們主要跟進(jìn)設(shè)備遙測數(shù)據(jù)這里。
在這里插入圖片描述
這里我們可以看到,對消息進(jìn)行一下封裝,加入了設(shè)備名稱,設(shè)備類型,發(fā)送的時(shí)間戳,然后將消息發(fā)送到規(guī)則鏈,進(jìn)入sendToRuleEngine方法
在這里插入圖片描述
可以看到這里指定了后面要發(fā)送到隊(duì)列的名稱,默認(rèn)為Main,從
String defaultQueueName = deviceProfile.getDefaultQueueName();
queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;可以看出在設(shè)備配置里面是可以指定發(fā)送到那個(gè)隊(duì)列的名稱的
我們繼續(xù)跟進(jìn)sendToRuleEngine方法
在這里插入圖片描述
這里可以看到我們的消息是通過ruleEngineMsgProducer生產(chǎn)者進(jìn)行發(fā)送到隊(duì)列的, ruleEngineMsgProducer.send這個(gè)接口對應(yīng)了多個(gè)實(shí)現(xiàn)
在這里插入圖片描述
有基于內(nèi)存的,Kafka,RabbitMQ等等,默認(rèn)是基于內(nèi)存的,這個(gè)可以在配置文件內(nèi)修改。
有生產(chǎn)者那么對應(yīng)肯定有消費(fèi)者,此時(shí)我們需要轉(zhuǎn)入消費(fèi)者,看看它后面的邏輯運(yùn)轉(zhuǎn)是怎樣的。
我們找到DefaultTbRuleEngineConsumerService核心消費(fèi)類
在這里插入圖片描述
在這里插入圖片描述
這里可以明顯地看到消費(fèi)者依次從隊(duì)列里取出消息處理后調(diào)用submitMessage處理后續(xù)邏輯
在這里插入圖片描述
forwardToRuleEngineActor從這個(gè)方法名字我們看到消息將推送到規(guī)則鏈Actor,后面就開始進(jìn)入Actor模型流轉(zhuǎn)了,這個(gè)不清楚的可以百度搜索一下。
在這里插入圖片描述
在這里插入圖片描述
首先進(jìn)入的是AppActor.tell方法,這個(gè)是頂級actor,是一切actor創(chuàng)建的源頭。
流轉(zhuǎn)過程:
AppActor==》TenantActor==》RuleChainActor==》RuleNodeActor
我們接著跟進(jìn)
!](https://img-blog.csdnimg.cn/6d61049e3bc64c8aa8f60262b0868439.png#pic_center)
在這里插入圖片描述
在這里插入圖片描述
這里消息進(jìn)行了分類,創(chuàng)建了兩個(gè)存儲(chǔ)消息的隊(duì)列,一個(gè)高優(yōu)先級隊(duì)列,一個(gè)正常隊(duì)列,根據(jù)消息的類型進(jìn)行添加,后調(diào)用tryProcessQueue方法進(jìn)一步處理消息,
initActor()這個(gè)對應(yīng)初始化各類的actor,
我們進(jìn)入tryProcessQueue方法。

在這里插入圖片描述
可以看到使用了線程池創(chuàng)建的線程去執(zhí)行我們的processMailbox方法
在這里插入圖片描述
在processMailbox方法里,可以看到,是先從高優(yōu)先級隊(duì)列里取消息進(jìn)行處理,直至其高優(yōu)先級隊(duì)列為空時(shí)才從正常隊(duì)列里取消息進(jìn)行處理,消息取出后調(diào)用actor.process方法進(jìn)行消息的處理流轉(zhuǎn),整個(gè)過程一直循環(huán)。
跟進(jìn)actor.process方法,進(jìn)入ContextAwareActor的process方法,進(jìn)入doProcess方法,然后流轉(zhuǎn)到各個(gè)分類的actor,這里我們選擇一個(gè)進(jìn)行跟進(jìn)
ruleChainActor
在這里插入圖片描述
可以看到這里也有很多分支,這里我們選兩個(gè)進(jìn)行分析:首先是:onRuleNodeToSelfMsg,
在這里插入圖片描述
跟進(jìn)后發(fā)現(xiàn)調(diào)用tbNode.onMsg方法進(jìn)入我們的節(jié)點(diǎn)的消息處理方法,點(diǎn)擊我們可以發(fā)現(xiàn)對應(yīng)很多個(gè)節(jié)點(diǎn)實(shí)現(xiàn),這個(gè)對應(yīng)的就是我們前臺(tái)web界面規(guī)則鏈里的一個(gè)個(gè)流轉(zhuǎn)的節(jié)點(diǎn)
在這里插入圖片描述
我們隨機(jī)選擇一個(gè)進(jìn)入CalculateDeltaNode
在這里插入圖片描述
在這里插入圖片描述
可以看到每個(gè)節(jié)點(diǎn)都有init(),onMsg方法,初始化和對應(yīng)節(jié)點(diǎn)的邏輯實(shí)現(xiàn),后續(xù)如果自己想自定義節(jié)點(diǎn)時(shí)可以參照著寫。到這里我們基本清楚了整個(gè)消息流轉(zhuǎn)的流程,先告一段落了。
再回到之前那個(gè)onQueueToRuleEngineMsg
我們選擇onQueueToRuleEngineMsg,對應(yīng)消息流轉(zhuǎn)到規(guī)則鏈節(jié)點(diǎn)。
在這里插入圖片描述
在這里插入圖片描述
這幾個(gè)也可以跟進(jìn)去看看,最終也是流轉(zhuǎn)到了規(guī)則鏈節(jié)點(diǎn)里,消息一直在這幾個(gè)過程里面循環(huán),就先講到這里了,后面有時(shí)間再繼續(xù)研究。

總結(jié)

thingsboard的源碼還是比較復(fù)雜一點(diǎn)的,其中有很多的分支流轉(zhuǎn),循環(huán)嵌套,而且很多地方使用的都是異步操作,線程池操作,斷點(diǎn)分析有時(shí)也不容易,不過不管是代碼規(guī)范還是邏輯上還是比較嚴(yán)謹(jǐn)?shù)?,耐心一點(diǎn)還是比較容易理解清楚整個(gè)流程的。

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧

當(dāng)前文章:thingsboard源碼分析--從設(shè)備數(shù)據(jù)上報(bào)的角度-創(chuàng)新互聯(lián)
網(wǎng)站網(wǎng)址:http://www.muchs.cn/article44/dsodhe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)網(wǎng)站建設(shè)、外貿(mào)建站、面包屑導(dǎo)航品牌網(wǎng)站建設(shè)、軟件開發(fā)、小程序開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)