Flink區(qū)分運(yùn)行環(huán)境的方法是什么

這篇文章主要介紹“Flink區(qū)分運(yùn)行環(huán)境的方法是什么”,在日常操作中,相信很多人在Flink區(qū)分運(yùn)行環(huán)境的方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對(duì)大家解答”Flink區(qū)分運(yùn)行環(huán)境的方法是什么”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

成都創(chuàng)新互聯(lián)是專業(yè)的忻州網(wǎng)站建設(shè)公司,忻州接單;提供做網(wǎng)站、網(wǎng)站建設(shè),網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行忻州網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!

Flink判斷運(yùn)行環(huán)境(本地、集群)的邏輯如下:

(1)在任務(wù)的main方法中,通過 StreamExecutionEnvironment 獲取運(yùn)行環(huán)境

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

(2)生成運(yùn)行環(huán)境的工廠類放在ThreadLocal中;threadLocalContextEnvironmentFactory 是StreamExecutionEnvironment類的靜態(tài)屬性 

	/** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */
	private static final ThreadLocal<StreamExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>();

①當(dāng)是本地IDE直接運(yùn)行任務(wù)main方法時(shí),ThreadLocal中獲取到的StreamExecutionEnvironmentFactory為空,此時(shí)生成本地運(yùn)行環(huán)境LocalStreamEnvironment

	public static StreamExecutionEnvironment getExecutionEnvironment() {
		return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
			.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
			.orElseGet(StreamExecutionEnvironment::createLocalEnvironment);
	}

當(dāng)ThreadLocal中有StreamExecutionEnvironmentFactory時(shí),則用其createExecutionEnvironment()方法來生成運(yùn)行環(huán)境

②當(dāng)集群環(huán)境時(shí),是如何將StreamExecutionEnvironmentFactory放入到ThreadLocal中?

通過 bin/flink run ....   命令提交jar包到集群運(yùn)行命令時(shí),該腳本會(huì)調(diào)用 org.apache.flink.client.cli.CliFrontend 來運(yùn)行用戶程序,如下:

.......
.......
# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

在CliFrontend中依次執(zhí)行以下方法 main() ->  parseParameters() -> run() -> executeProgram() 

	protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
		ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
	}

在org.apache.flink.client.ClientUtils的executeProgram()中調(diào)用 StreamContextEnvironment.setAsContext(...),StreamContextEnvironment繼承自StreamExecutionEnvironment。setAsContext()代碼如下

	public static void setAsContext(
			final PipelineExecutorServiceLoader executorServiceLoader,
			final Configuration configuration,
			final ClassLoader userCodeClassLoader,
			final boolean enforceSingleJobExecution,
			final boolean suppressSysout) {
		StreamExecutionEnvironmentFactory factory = () -> new StreamContextEnvironment(
			executorServiceLoader,
			configuration,
			userCodeClassLoader,
			enforceSingleJobExecution,
			suppressSysout);
		initializeContextEnvironment(factory);
	}

創(chuàng)建生成運(yùn)行環(huán)境的工廠類實(shí)例,在initializeContextEnvironment()方法中把實(shí)例放到StreamExecutionEnvironment類的靜態(tài)屬性threadLocalContextEnvironmentFactory 中 ,代碼如下

	protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
		contextEnvironmentFactory = ctx;
		threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
	}

這樣在用戶程序 StreamExecutionEnvironment.getExecutionEnvironment() 時(shí),獲取到的運(yùn)行環(huán)境就是StreamContextEnvironment類的setAsContext()方法中生成的

	public static void setAsContext(
			final PipelineExecutorServiceLoader executorServiceLoader,
			final Configuration configuration,
			final ClassLoader userCodeClassLoader,
			final boolean enforceSingleJobExecution,
			final boolean suppressSysout) {
		StreamExecutionEnvironmentFactory factory = () -> new StreamContextEnvironment(
			executorServiceLoader,
			configuration,
			userCodeClassLoader,
			enforceSingleJobExecution,
			suppressSysout);
		......
	}

本地運(yùn)行環(huán)境LocalStreamEnvironment 和 獨(dú)立集群、flink on yarn等運(yùn)行環(huán)境StreamContextEnvironment的主要區(qū)別在于,他們的成員屬性 configuration不同。LocalStreamEnvironment中是創(chuàng)建的空鍵值對(duì)(new Configuration()),而StreamContextEnvironment是通過 CliFrontend生成的 Configuration 對(duì)象。

到此,關(guān)于“Flink區(qū)分運(yùn)行環(huán)境的方法是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

分享題目:Flink區(qū)分運(yùn)行環(huán)境的方法是什么
文章位置:http://muchs.cn/article4/ighsoe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站制作、搜索引擎優(yōu)化、虛擬主機(jī)商城網(wǎng)站、網(wǎng)站改版

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作