Flink的rpc組件有哪些

本篇內(nèi)容介紹了“Flink的rpc組件有哪些”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

新洲網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)建站,新洲網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為新洲數(shù)千家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站建設(shè)公司要多少錢,請(qǐng)找那個(gè)售后服務(wù)好的新洲做網(wǎng)站的公司定做!

Flink采用akka來(lái)實(shí)現(xiàn)rpc服務(wù)。其中有這幾個(gè)重要組件:RpcServer、RpcService、AkkaRpcActor、RpcEndpoint。

Flink的rpc組件有哪些

這幾個(gè)組件作用如下:

(1)RpcEndpoint

提供具體rpc服務(wù)。主要實(shí)現(xiàn)有 ResourceManager 和 TaskExecutor,

①YarnResourceManager為AM容器中啟動(dòng)的服務(wù),持有ResourceManager和NodeManager的客戶端

②TaskExecutor為NM容器中啟動(dòng)taskmanager的類

(2)AkkaRpcService

提供rpc的服務(wù)類。該類內(nèi)部持有ActorSystem實(shí)例和Supervisor實(shí)例。Supervisor中含有SupervisorActor實(shí)例,SupervisorActor用于創(chuàng)建其他Actor,可以理解為根Actor。RpcEndpoint在構(gòu)造時(shí),通過(guò)AkkaRpcService的startServer()方法,獲取RpcServer實(shí)例。

	public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
		checkNotNull(rpcEndpoint, "rpc endpoint");

		final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint);
		final ActorRef actorRef = actorRegistration.getActorRef();
		final CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture();

		LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());

		final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
		final String hostname;
		Option<String> host = actorRef.path().address().host();
		if (host.isEmpty()) {
			hostname = "localhost";
		} else {
			hostname = host.get();
		}

		Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));

		implementedRpcGateways.add(RpcServer.class);
		implementedRpcGateways.add(AkkaBasedEndpoint.class);

		final InvocationHandler akkaInvocationHandler;

		if (rpcEndpoint instanceof FencedRpcEndpoint) {
			// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
			akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
				akkaAddress,
				hostname,
				actorRef,
				configuration.getTimeout(),
				configuration.getMaximumFramesize(),
				actorTerminationFuture,
				((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
				captureAskCallstacks);

			implementedRpcGateways.add(FencedMainThreadExecutable.class);
		} else {
			akkaInvocationHandler = new AkkaInvocationHandler(
				akkaAddress,
				hostname,
				actorRef,
				configuration.getTimeout(),
				configuration.getMaximumFramesize(),
				actorTerminationFuture,
				captureAskCallstacks);
		}

		// Rather than using the System ClassLoader directly, we derive the ClassLoader
		// from this class . That works better in cases where Flink runs embedded and all Flink
		// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
		ClassLoader classLoader = getClass().getClassLoader();

		@SuppressWarnings("unchecked")
		RpcServer server = (RpcServer) Proxy.newProxyInstance(
			classLoader,
			implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
			akkaInvocationHandler);

		return server;
	}

先創(chuàng)建RpcEndpoint對(duì)應(yīng)的ActorRef,然后創(chuàng)建RpcServer的代理類AkkaInvocationHandler或FencedAkkaInvocationHandler,并將ActorRef實(shí)例賦給其成員屬性 rpcEndpoint:ActorRef。這里的ActorRef即為AkkaRpcActor或FencedAkkaRpcActor實(shí)例

(3)RpcServer

用來(lái)啟動(dòng)rpc服務(wù),通常不直接調(diào)用,而是調(diào)用其動(dòng)態(tài)代理類AkkaInvocationHandler或FencedAkkaInvocationHandler的start()方法

(4)AkkaInvocationHandler或FencedAkkaInvocationHandler

RpcServer的動(dòng)態(tài)代理類。start()方法用來(lái)啟動(dòng)服務(wù):

	public void start() {
		rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
	}

這里向rpcEndpoint,即AkkaRpcActor或FencedAkkaRpcActor實(shí)例發(fā)送一條ControlMessages.START消息

(5)AkkaRpcActor

響應(yīng)rpc消息的actor。其createReceive():

	public Receive createReceive() {
		return ReceiveBuilder.create()
			.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
			.match(ControlMessages.class, this::handleControlMessage)
			.matchAny(this::handleMessage)
			.build();
	}

當(dāng)消息為ControlMessages.START,調(diào)用StoppedState 的start()方法

		public State start(AkkaRpcActor<?> akkaRpcActor) {
			akkaRpcActor.mainThreadValidator.enterMainThread();

			try {
				akkaRpcActor.rpcEndpoint.internalCallOnStart();
			} catch (Throwable throwable) {
				akkaRpcActor.stop(
					RpcEndpointTerminationResult.failure(
						new AkkaRpcException(
							String.format("Could not start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
							throwable)));
			} finally {
				akkaRpcActor.mainThreadValidator.exitMainThread();
			}

			return StartedState.STARTED;
		}

在start()方法中調(diào)用具體提供服務(wù)的RpcEndpoint實(shí)現(xiàn)類internalCallOnStart()方法來(lái)啟動(dòng)服務(wù)。internalCallOnStart()方法中會(huì)調(diào)用onStart()方法。

“Flink的rpc組件有哪些”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

網(wǎng)站欄目:Flink的rpc組件有哪些
轉(zhuǎn)載來(lái)于:http://muchs.cn/article10/ighpdo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供動(dòng)態(tài)網(wǎng)站、網(wǎng)站收錄、響應(yīng)式網(wǎng)站、網(wǎng)站改版、網(wǎng)站導(dǎo)航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站建設(shè)