本篇內(nèi)容介紹了“Flume整體流程是怎樣的”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
專注于為中小企業(yè)提供成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)鄞州免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上1000家企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過(guò)網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
不管是Source還是Sink都依賴Channel,那么啟動(dòng)時(shí)應(yīng)該先啟動(dòng)Channel然后再啟動(dòng)Source或Sink即可。
Flume有兩種啟動(dòng)方式:使用EmbeddedAgent內(nèi)嵌在Java應(yīng)用中或使用Application單獨(dú)啟動(dòng)一個(gè)進(jìn)程,此處我們已Application分析為主。
首先進(jìn)入org.apache.flume.node.Application的main方法啟動(dòng):
//1、設(shè)置默認(rèn)值啟動(dòng)參數(shù)、參數(shù)是否必須的 Options options = new Options(); Option option = new Option("n", "name", true, "the name of this agent"); option.setRequired(true); options.addOption(option); option = new Option("f", "conf-file", true, "specify a config file (required if -z missing)"); option.setRequired(false); options.addOption(option); //2、接著解析命令行參數(shù) CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(options, args); String agentName = commandLine.getOptionValue('n'); boolean reload = !commandLine.hasOption("no-reload-conf"); if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) { isZkConfigured = true; } if (isZkConfigured) { //3、如果是通過(guò)ZooKeeper配置,則使用ZooKeeper參數(shù)啟動(dòng),此處忽略,我們以配置文件講解 } else { //4、打開(kāi)配置文件,如果不存在則快速失敗 File configurationFile = new File(commandLine.getOptionValue('f')); if (!configurationFile.exists()) { throw new ParseException( "The specified configuration file does not exist: " + path); } List<LifecycleAware> components = Lists.newArrayList(); if (reload) { //5、如果需要定期reload配置文件,則走如下方式 //5.1、此處使用Guava提供的事件總線 EventBus eventBus = new EventBus(agentName + "-event-bus"); //5.2、讀取配置文件,使用定期輪訓(xùn)拉起策略,默認(rèn)30s拉取一次 PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider( agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); //5.3、向Application注冊(cè)組件 //5.4、向事件總線注冊(cè)本應(yīng)用,EventBus會(huì)自動(dòng)注冊(cè)Application中使用@Subscribe聲明的方法 eventBus.register(application); } else { //5、配置文件不支持定期reload PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider( agentName, configurationFile); application = new Application(); //6.2、直接使用配置文件初始化Flume組件 application.handleConfigurationEvent(configurationProvider .getConfiguration()); } } //7、啟動(dòng)Flume應(yīng)用 application.start(); //8、注冊(cè)虛擬機(jī)關(guān)閉鉤子,當(dāng)虛擬機(jī)關(guān)閉時(shí)調(diào)用Application的stop方法進(jìn)行終止 final Application appReference = application; Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { @Override public void run() { appReference.stop(); } });
以上流程只提取了核心代碼中的一部分,比如ZK的實(shí)現(xiàn)直接忽略了,而Flume啟動(dòng)大體流程如下:
1、讀取命令行參數(shù);
2、讀取配置文件;
3、根據(jù)是否需要reload使用不同的策略初始化Flume;如果需要reload,則使用Guava的事件總線實(shí)現(xiàn),Application的handleConfigurationEvent是事件訂閱者,PollingPropertiesFileConfigurationProvider是事件發(fā)布者,其會(huì)定期輪訓(xùn)檢查文件是否變更,如果變更則重新讀取配置文件,發(fā)布配置文件事件變更,而handleConfigurationEvent會(huì)收到該配置變更重新進(jìn)行初始化;
4、啟動(dòng)Application,并注冊(cè)虛擬機(jī)關(guān)閉鉤子。
handleConfigurationEvent方法比較簡(jiǎn)單,首先調(diào)用了stopAllComponents停止所有組件,接著調(diào)用startAllComponents使用配置文件初始化所有組件:
@Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); }
MaterializedConfiguration存儲(chǔ)Flume運(yùn)行時(shí)需要的組件:Source、Channel、Sink、SourceRunner、SinkRunner等,其是通過(guò)ConfigurationProvider進(jìn)行初始化獲取,比如PollingPropertiesFileConfigurationProvider會(huì)讀取配置文件然后進(jìn)行組件的初始化。
對(duì)于startAllComponents實(shí)現(xiàn)大體如下:
//1、首先啟動(dòng)Channel supervisor.supervise(Channels, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //2、確保所有Channel是否都已啟動(dòng) for(Channel ch: materializedConfiguration.getChannels().values()){ while(ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)){ try { Thread.sleep(500); } catch (InterruptedException e) { Throwables.propagate(e); } } } //3、啟動(dòng)SinkRunner supervisor.supervise(SinkRunners, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //4、啟動(dòng)SourceRunner supervisor.supervise(SourceRunner, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //5、初始化監(jiān)控服務(wù) this.loadMonitoring();
從如下代碼中可以看到,首先要準(zhǔn)備好Channel,因?yàn)镾ource和Sink會(huì)操作它,對(duì)于Channel如果初始化失敗則整個(gè)流程是失敗的;然后啟動(dòng)SinkRunner,先準(zhǔn)備好消費(fèi)者;接著啟動(dòng)SourceRunner開(kāi)始進(jìn)行采集日志。此處我們發(fā)現(xiàn)有兩個(gè)單獨(dú)的組件LifecycleSupervisor和MonitorService,一個(gè)是組件守護(hù)哨兵,一個(gè)是監(jiān)控服務(wù)。守護(hù)哨兵對(duì)這些組件進(jìn)行守護(hù),假設(shè)出問(wèn)題了默認(rèn)策略是自動(dòng)重啟這些組件。
對(duì)于stopAllComponents實(shí)現(xiàn)大體如下:
//1、首先停止SourceRunner supervisor.unsupervise(SourceRunners); //2、接著停止SinkRunner supervisor.unsupervise(SinkRunners); //3、然后停止Channel supervisor.unsupervise(Channels); //4、最后停止MonitorService monitorServer.stop();
此處可以看出,停止的順序是Source、Sink、Channel,即先停止生產(chǎn),再停止消費(fèi),最后停止管道。
Application中的start方法代碼實(shí)現(xiàn)如下:
public synchronized void start() { for(LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }
其循環(huán)Application注冊(cè)的組件,然后守護(hù)哨兵對(duì)它進(jìn)行守護(hù),默認(rèn)策略是出現(xiàn)問(wèn)題會(huì)自動(dòng)重啟組件,假設(shè)我們支持reload配置文件,則之前啟動(dòng)Application時(shí)注冊(cè)過(guò)PollingPropertiesFileConfigurationProvider組件,即該組件會(huì)被守護(hù)哨兵守護(hù)著,出現(xiàn)問(wèn)題默認(rèn)策略自動(dòng)重啟。
而Application關(guān)閉執(zhí)行了如下動(dòng)作:
public synchronized void stop() { supervisor.stop(); if(monitorServer != null) { monitorServer.stop(); } }
即關(guān)閉守護(hù)哨兵和監(jiān)控服務(wù)。
到此基本的Application分析結(jié)束了,我們還有很多疑問(wèn),守護(hù)哨兵怎么實(shí)現(xiàn)的。
整體流程可以總結(jié)為:
1、首先初始化命令行配置;
2、接著讀取配置文件;
3、根據(jù)是否需要reload初始化配置文件中的組件;如果需要reload會(huì)使用Guava事件總線進(jìn)行發(fā)布訂閱變化;
4、接著創(chuàng)建Application,創(chuàng)建守護(hù)哨兵,并先停止所有組件,接著啟動(dòng)所有組件;啟動(dòng)順序:Channel、SinkRunner、SourceRunner,并把這些組件注冊(cè)給守護(hù)哨兵、初始化監(jiān)控服務(wù);停止順序:SourceRunner、SinkRunner、Channel;
5、如果配置文件需要定期reload,則需要注冊(cè)Polling***ConfigurationProvider到守護(hù)哨兵;
6、最后注冊(cè)虛擬機(jī)關(guān)閉鉤子,停止守護(hù)哨兵和監(jiān)控服務(wù)。
輪訓(xùn)實(shí)現(xiàn)的SourceRunner 和SinkRunner會(huì)創(chuàng)建一個(gè)線程進(jìn)行工作,之前已經(jīng)介紹了其工作方式。接下來(lái)我們看下守護(hù)哨兵的實(shí)現(xiàn)。
首先創(chuàng)建LifecycleSupervisor:
//1、用于存放被守護(hù)的組件 supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>(); //2、用于存放正在被監(jiān)控的組件 monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>(); //3、創(chuàng)建監(jiān)控服務(wù)線程池 monitorService = new ScheduledThreadPoolExecutor(10, new ThreadFactoryBuilder().setNameFormat( "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d") .build()); monitorService.setMaximumPoolSize(20); monitorService.setKeepAliveTime(30, TimeUnit.SECONDS); //4、定期清理被取消的組件 purger = new Purger(); //4.1、默認(rèn)不進(jìn)行清理 needToPurge = false;
LifecycleSupervisor啟動(dòng)時(shí)會(huì)進(jìn)行如下操作:
public synchronized void start() { monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS); lifecycleState = LifecycleState.START; }
首先每隔兩個(gè)小時(shí)執(zhí)行清理組件,然后改變狀態(tài)為啟動(dòng)。而LifecycleSupervisor停止時(shí)直接停止了監(jiān)控服務(wù),然后更新守護(hù)組件狀態(tài)為STOP:
//1、首先停止守護(hù)監(jiān)控服務(wù) if (monitorService != null) { monitorService.shutdown(); try { monitorService.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.error("Interrupted while waiting for monitor service to stop"); } } //2、更新所有守護(hù)組件狀態(tài)為STOP,并調(diào)用組件的stop方法進(jìn)行停止 for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) { if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) { entry.getValue().status.desiredState = LifecycleState.STOP; entry.getKey().stop(); } } //3、更新本組件狀態(tài) if (lifecycleState.equals(LifecycleState.START)) { lifecycleState = LifecycleState.STOP; } //4、最后的清理 supervisedProcesses.clear(); monitorFutures.clear();
接下來(lái)就是調(diào)用supervise進(jìn)行組件守護(hù)了:
if(this.monitorService.isShutdown() || this.monitorService.isTerminated() || this.monitorService.isTerminating()){ //1、如果哨兵已停止則拋出異常,不再接收任何組件進(jìn)行守護(hù) } //2、初始化守護(hù)組件 Supervisoree process = new Supervisoree(); process.status = new Status(); //2.1、默認(rèn)策略是失敗重啟 process.policy = policy; //2.2、初始化組件默認(rèn)狀態(tài),大多數(shù)組件默認(rèn)為START process.status.desiredState = desiredState; process.status.error = false; //3、組件監(jiān)控器,用于定時(shí)獲取組件的最新?tīng)顟B(tài),或者重新啟動(dòng)組件 MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); //4、定期的去執(zhí)行組件監(jiān)控器,獲取組件最新?tīng)顟B(tài),或者重新啟動(dòng)組件 ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future); }
如果不需要守護(hù)了,則需要調(diào)用unsupervise:
public synchronized void unsupervise(LifecycleAware lifecycleAware) { synchronized (lifecycleAware) { Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware); //1.1、設(shè)置守護(hù)組件的狀態(tài)為被丟棄 supervisoree.status.discard = true; //1.2、設(shè)置組件盼望的最新生命周期狀態(tài)為STOP this.setDesiredState(lifecycleAware, LifecycleState.STOP); //1.3、停止組件 lifecycleAware.stop(); } //2、從守護(hù)組件中移除 supervisedProcesses.remove(lifecycleAware); //3、取消定時(shí)監(jiān)控組件服務(wù) monitorFutures.get(lifecycleAware).cancel(false); //3.1、通知Purger需要進(jìn)行清理,Purger會(huì)定期的移除cancel的組件 needToPurge = true; monitorFutures.remove(lifecycleAware); }
接下來(lái)我們?cè)倏聪翸onitorRunnable的實(shí)現(xiàn),其負(fù)責(zé)進(jìn)行組件狀態(tài)遷移或組件故障恢復(fù):
public void run() { long now = System.currentTimeMillis(); try { if (supervisoree.status.firstSeen == null) { supervisoree.status.firstSeen = now; //1、記錄第一次狀態(tài)查看時(shí)間 } supervisoree.status.lastSeen = now; //2、記錄最后一次狀態(tài)查看時(shí)間 synchronized (lifecycleAware) { //3、如果守護(hù)組件被丟棄或出錯(cuò)了,則直接返回 if (supervisoree.status.discard || supervisoree.status.error) { return; } //4、更新最后一次查看到的狀態(tài) supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); //5、如果組件的狀態(tài)和守護(hù)組件看到的狀態(tài)不一致,則以守護(hù)組件的狀態(tài)為準(zhǔn),然后進(jìn)行初始化 if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) { switch (supervisoree.status.desiredState) { case START: //6、如果是啟動(dòng)狀態(tài),則啟動(dòng)組件 try { lifecycleAware.start(); } catch (Throwable e) { if (e instanceof Error) { supervisoree.status.desiredState = LifecycleState.STOP; try { lifecycleAware.stop(); } catch (Throwable e1) { supervisoree.status.error = true; if (e1 instanceof Error) { throw (Error) e1; } } } supervisoree.status.failures++; } break; case STOP: //7、如果是停止?fàn)顟B(tài),則停止組件 try { lifecycleAware.stop(); } catch (Throwable e) { if (e instanceof Error) { throw (Error) e; } supervisoree.status.failures++; } break; default: } } catch(Throwable t) { } } }
如上代碼進(jìn)行了一些簡(jiǎn)化,整體邏輯即定時(shí)去采集組件的狀態(tài),如果發(fā)現(xiàn)守護(hù)組件和組件的狀態(tài)不一致,則可能需要進(jìn)行啟動(dòng)或停止。即守護(hù)監(jiān)視器可以用來(lái)保證組件如能失敗后自動(dòng)啟動(dòng)。默認(rèn)策略是總是失敗后重啟,還有一種策略是只啟動(dòng)一次。
“Flume整體流程是怎樣的”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
當(dāng)前名稱:Flume整體流程是怎樣的
文章地址:http://muchs.cn/article40/ijcoeo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站建設(shè)、網(wǎng)站制作、網(wǎng)站設(shè)計(jì)、網(wǎng)站導(dǎo)航、企業(yè)建站、網(wǎng)站營(yíng)銷
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)