Flink分析使用之十底层RPC的实现
发布日期:2021-05-07 01:15:55 浏览次数:33 分类:原创文章

本文共 30358 字,大约阅读时间需要 101 分钟。

一、介绍

Flink是分布式大数据处理框架,那么网络通信就离不开了,从目前来看,几乎所有的开源的大型软件,尤其是Java的,基本已经不再倾向于自己写底层网络通信,毕竟有很多可以使用的Rpc网络通信框架可以来完善使用,Flink也是如此,它是基本Akka Rpc这款Rpc通信框架的。

二、分析

1、服务端

先看一下测试代码的最基础的通信代码(RpcEndPointTest.java):

public interface BaseGateway extends RpcGateway {     CompletableFuture<Integer> foobar();}//基础的通信协议接口public interface ExtendedGateway extends BaseGateway {     CompletableFuture<Integer> barfoo();}public interface DifferentGateway extends RpcGateway {     CompletableFuture<String> foo();}public static class BaseEndpoint extends RpcEndpoint implements BaseGateway {     private final int foobarValue;  protected BaseEndpoint(RpcService rpcService, int foobarValue) {       super(rpcService);    this.foobarValue = foobarValue;  }  @Override  public CompletableFuture<Integer> foobar() {       return CompletableFuture.completedFuture(foobarValue);  }}

在Flink中,Rpc的网关接口非常简单,代码如下:

public interface RpcGateway {   	/**	 * Returns the fully qualified address under which the associated rpc endpoint is reachable.	 *	 * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable	 */	String getAddress();	/**	 * Returns the fully qualified hostname under which the associated rpc endpoint is reachable.	 *	 * @return Fully qualified hostname under which the associated rpc endpoint is reachable	 */	String getHostname();}

这两方法太简单了,不解释。在Akka的服务中要获得相关的Actor,地址是必须的,而在网络通信中既可以直接获取地址,又可以通过HostName来获得。在flink中,网络的服务端是由RpcService这个接口来控制:

public interface RpcService {   	/**	 * Return the hostname or host address under which the rpc service can be reached.	 * If the rpc service cannot be contacted remotely, then it will return an empty string.	 *	 * @return Address of the rpc service or empty string if local rpc service	 */	String getAddress();	/**	 * Return the port under which the rpc service is reachable. If the rpc service cannot be	 * contacted remotely, then it will return -1.	 *	 * @return Port of the rpc service or -1 if local rpc service	 */	int getPort();	/**	 * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can	 * be used to communicate with the rpc server. If the connection failed, then the returned	 * future is failed with a {@link RpcConnectionException}.	 *	 * @param address Address of the remote rpc server	 * @param clazz Class of the rpc gateway to return	 * @param <C> Type of the rpc gateway to return	 * @return Future containing the rpc gateway or an {@link RpcConnectionException} if the	 * connection attempt failed	 */	<C extends RpcGateway> CompletableFuture<C> connect(		String address,		Class<C> clazz);	/**	 * Connect to a remote fenced rpc server under the provided address. Returns a fenced rpc gateway	 * which can be used to communicate with the rpc server. If the connection failed, then the	 * returned future is failed with a {@link RpcConnectionException}.	 *	 * @param address Address of the remote rpc server	 * @param fencingToken Fencing token to be used when communicating with the server	 * @param clazz Class of the rpc gateway to return	 * @param <F> Type of the fencing token	 * @param <C> Type of the rpc gateway to return	 * @return Future containing the fenced rpc gateway or an {@link RpcConnectionException} if the	 * connection attempt failed	 */	<F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(		String address,		F fencingToken,		Class<C> clazz);	/**	 * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint.	 *	 * @param rpcEndpoint Rpc protocol to dispatch the rpcs to	 * @param <C> Type of the rpc endpoint	 * @return Self gateway to dispatch remote procedure calls to oneself	 */	<C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint);	/**	 * Fence the given RpcServer with the given fencing token.	 *	 * <p>Fencing the RpcServer means that we fix the fencing token to the provided value.	 * All RPCs will then be enriched with this fencing token. This expects that the receiving	 * RPC endpoint extends {@link FencedRpcEndpoint}.	 *	 * @param rpcServer to fence with the given fencing token	 * @param fencingToken to fence the RpcServer with	 * @param <F> type of the fencing token	 * @return Fenced RpcServer	 */	<F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken);	/**	 * Stop the underlying rpc server of the provided self gateway.	 *	 * @param selfGateway Self gateway describing the underlying rpc server	 */	void stopServer(RpcServer selfGateway);	/**	 * Trigger the asynchronous stopping of the {@link RpcService}.	 *	 * @return Future which is completed once the {@link RpcService} has been	 * fully stopped.	 */	CompletableFuture<Void> stopService();	/**	 * Returns a future indicating when the RPC service has been shut down.	 *	 * @return Termination future	 */	CompletableFuture<Void> getTerminationFuture();	/**	 * Gets the executor, provided by this RPC service. This executor can be used for example for	 * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures.	 *	 * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against	 * any concurrent invocations and is therefore not suitable to run completion methods of futures	 * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the	 * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that	 * {@code RpcEndpoint}.	 *	 * @return The execution context provided by the RPC service	 */	Executor getExecutor();	/**	 * Gets a scheduled executor from the RPC service. This executor can be used to schedule	 * tasks to be executed in the future.	 *	 * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against	 * any concurrent invocations and is therefore not suitable to run completion methods of futures	 * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the	 * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that	 * {@code RpcEndpoint}.	 *	 * @return The RPC service provided scheduled executor	 */	ScheduledExecutor getScheduledExecutor();	/**	 * Execute the runnable in the execution context of this RPC Service, as returned by	 * {@link #getExecutor()}, after a scheduled delay.	 *	 * @param runnable Runnable to be executed	 * @param delay    The delay after which the runnable will be executed	 */	ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);	/**	 * Execute the given runnable in the executor of the RPC service. This method can be used to run	 * code outside of the main thread of a {@link RpcEndpoint}.	 *	 * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against	 * any concurrent invocations and is therefore not suitable to run completion methods of futures	 * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the	 * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that	 * {@code RpcEndpoint}.	 *	 * @param runnable to execute	 */	void execute(Runnable runnable);	/**	 * Execute the given callable and return its result as a {@link CompletableFuture}. This method can be used	 * to run code outside of the main thread of a {@link RpcEndpoint}.	 *	 * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against	 * any concurrent invocations and is therefore not suitable to run completion methods of futures	 * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the	 * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that	 * {@code RpcEndpoint}.	 *	 * @param callable to execute	 * @param <T> is the return value type	 * @return Future containing the callable's future result	 */	<T> CompletableFuture<T> execute(Callable<T> callable);}

由他来控制着相关的服务端节点的启动,即RpcEndPoint:

public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {   	protected final Logger log = LoggerFactory.getLogger(getClass());	// ------------------------------------------------------------------------	/** RPC service to be used to start the RPC server and to obtain rpc gateways. */	private final RpcService rpcService;	/** Unique identifier for this rpc endpoint. */	private final String endpointId;	/** Interface to access the underlying rpc server. */	protected final RpcServer rpcServer;	/** A reference to the endpoint's main thread, if the current method is called by the main thread. */	final AtomicReference<Thread> currentMainThread = new AtomicReference<>(null);	/** The main thread executor to be used to execute future callbacks in the main thread	 * of the executing rpc server. */	private final MainThreadExecutor mainThreadExecutor;	/**	 * Initializes the RPC endpoint.	 *	 * @param rpcService The RPC server that dispatches calls to this RPC endpoint.	 * @param endpointId Unique identifier for this endpoint	 */	protected RpcEndpoint(final RpcService rpcService, final String endpointId) {   		this.rpcService = checkNotNull(rpcService, "rpcService");		this.endpointId = checkNotNull(endpointId, "endpointId");		this.rpcServer = rpcService.startServer(this);		this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);	}	/**	 * Initializes the RPC endpoint with a random endpoint id.	 *	 * @param rpcService The RPC server that dispatches calls to this RPC endpoint.	 */	protected RpcEndpoint(final RpcService rpcService) {   		this(rpcService, UUID.randomUUID().toString());	}	/**	 * Returns the rpc endpoint's identifier.	 *	 * @return Rpc endpoint's identifier.	 */	public String getEndpointId() {   		return endpointId;	}	// ------------------------------------------------------------------------	//  Start & shutdown & lifecycle callbacks	// ------------------------------------------------------------------------	/**	 * Starts the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready	 * to process remote procedure calls.	 *	 * @throws Exception indicating that something went wrong while starting the RPC endpoint	 */	public final void start() {   		rpcServer.start();	}	/**	 * User overridable callback.	 *	 * <p>This method is called when the RpcEndpoint is being started. The method is guaranteed	 * to be executed in the main thread context and can be used to start the rpc endpoint in the	 * context of the rpc endpoint's main thread.	 *	 * <p>IMPORTANT: This method should never be called directly by the user.	 * @throws Exception indicating that the rpc endpoint could not be started. If an exception occurs,	 * then the rpc endpoint will automatically terminate.	 */	public void onStart() throws Exception {   }	/**	 * Stops the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is	 * no longer ready to process remote procedure calls.	 */	protected final void stop() {   		rpcServer.stop();	}	/**	 * User overridable callback.	 *	 * <p>This method is called when the RpcEndpoint is being shut down. The method is guaranteed	 * to be executed in the main thread context and can be used to clean up internal state.	 *	 * <p>IMPORTANT: This method should never be called directly by the user.	 *	 * @return Future which is completed once all post stop actions are completed. If an error	 * occurs this future is completed exceptionally	 */	public CompletableFuture<Void> onStop() {   		return CompletableFuture.completedFuture(null);	}	/**	 * Triggers the shut down of the rpc endpoint. The shut down is executed asynchronously.	 *	 * <p>In order to wait on the completion of the shut down, obtain the termination future	 * via {@link #getTerminationFuture()}} and wait on its completion.	 */	@Override	public final CompletableFuture<Void> closeAsync() {   		rpcService.stopServer(rpcServer);		return getTerminationFuture();	}	// ------------------------------------------------------------------------	//  Basic RPC endpoint properties	// ------------------------------------------------------------------------	/**	 * Returns a self gateway of the specified type which can be used to issue asynchronous	 * calls against the RpcEndpoint.	 *	 * <p>IMPORTANT: The self gateway type must be implemented by the RpcEndpoint. Otherwise	 * the method will fail.	 *	 * @param selfGatewayType class of the self gateway type	 * @param <C> type of the self gateway to create	 * @return Self gateway of the specified type which can be used to issue asynchronous rpcs	 */	public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType) {   		if (selfGatewayType.isInstance(rpcServer)) {   			@SuppressWarnings("unchecked")			C selfGateway = ((C) rpcServer);			return selfGateway;		} else {   			throw new RuntimeException("RpcEndpoint does not implement the RpcGateway interface of type " + selfGatewayType + '.');		}	}	/**	 * Gets the address of the underlying RPC endpoint. The address should be fully qualified so that	 * a remote system can connect to this RPC endpoint via this address.	 *	 * @return Fully qualified address of the underlying RPC endpoint	 */	@Override	public String getAddress() {   		return rpcServer.getAddress();	}	/**	 * Gets the hostname of the underlying RPC endpoint.	 *	 * @return Hostname on which the RPC endpoint is running	 */	@Override	public String getHostname() {   		return rpcServer.getHostname();	}	/**	 * Gets the main thread execution context. The main thread execution context can be used to	 * execute tasks in the main thread of the underlying RPC endpoint.	 *	 * @return Main thread execution context	 */	protected MainThreadExecutor getMainThreadExecutor() {   		return mainThreadExecutor;	}	/**	 * Gets the endpoint's RPC service.	 *	 * @return The endpoint's RPC service	 */	public RpcService getRpcService() {   		return rpcService;	}	/**	 * Return a future which is completed with true when the rpc endpoint has been terminated.	 * In case of a failure, this future is completed with the occurring exception.	 *	 * @return Future which is completed when the rpc endpoint has been terminated.	 */	public CompletableFuture<Void> getTerminationFuture() {   		return rpcServer.getTerminationFuture();	}	// ------------------------------------------------------------------------	//  Asynchronous executions	// ------------------------------------------------------------------------	/**	 * Execute the runnable in the main thread of the underlying RPC endpoint.	 *	 * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint	 */	protected void runAsync(Runnable runnable) {   		rpcServer.runAsync(runnable);	}	/**	 * Execute the runnable in the main thread of the underlying RPC endpoint, with	 * a delay of the given number of milliseconds.	 *	 * @param runnable Runnable to be executed	 * @param delay    The delay after which the runnable will be executed	 */	protected void scheduleRunAsync(Runnable runnable, Time delay) {   		scheduleRunAsync(runnable, delay.getSize(), delay.getUnit());	}	/**	 * Execute the runnable in the main thread of the underlying RPC endpoint, with	 * a delay of the given number of milliseconds.	 *	 * @param runnable Runnable to be executed	 * @param delay    The delay after which the runnable will be executed	 */	protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {   		rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay));	}	/**	 * Execute the callable in the main thread of the underlying RPC service, returning a future for	 * the result of the callable. If the callable is not completed within the given timeout, then	 * the future will be failed with a {@link TimeoutException}.	 *	 * @param callable Callable to be executed in the main thread of the underlying rpc server	 * @param timeout Timeout for the callable to be completed	 * @param <V> Return type of the callable	 * @return Future for the result of the callable.	 */	protected <V> CompletableFuture<V> callAsync(Callable<V> callable, Time timeout) {   		return rpcServer.callAsync(callable, timeout);	}	// ------------------------------------------------------------------------	//  Main Thread Validation	// ------------------------------------------------------------------------	/**	 * Validates that the method call happens in the RPC endpoint's main thread.	 *	 * <p><b>IMPORTANT:</b> This check only happens when assertions are enabled,	 * such as when running tests.	 *	 * <p>This can be used for additional checks, like	 * <pre>{@code	 * protected void concurrencyCriticalMethod() {	 *     validateRunsInMainThread();	 *	 *     // some critical stuff	 * }	 * }</pre>	 */	public void validateRunsInMainThread() {   		assert MainThreadValidatorUtil.isRunningInExpectedThread(currentMainThread.get());	}	// ------------------------------------------------------------------------	//  Utilities	// ------------------------------------------------------------------------	/**	 * Executor which executes runnables in the main thread context.	 */	protected static class MainThreadExecutor implements ComponentMainThreadExecutor {   		private final MainThreadExecutable gateway;		private final Runnable mainThreadCheck;		MainThreadExecutor(MainThreadExecutable gateway, Runnable mainThreadCheck) {   			this.gateway = Preconditions.checkNotNull(gateway);			this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);		}		public void runAsync(Runnable runnable) {   			gateway.runAsync(runnable);		}		public void scheduleRunAsync(Runnable runnable, long delayMillis) {   			gateway.scheduleRunAsync(runnable, delayMillis);		}		public void execute(@Nonnull Runnable command) {   			runAsync(command);		}		@Override		public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {   			final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);			FutureTask<Void> ft = new FutureTask<>(command, null);			scheduleRunAsync(ft, delayMillis);			return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS);		}		@Override		public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {   			throw new UnsupportedOperationException("Not implemented because the method is currently not required.");		}		@Override		public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {   			throw new UnsupportedOperationException("Not implemented because the method is currently not required.");		}		@Override		public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {   			throw new UnsupportedOperationException("Not implemented because the method is currently not required.");		}		@Override		public void assertRunningInMainThread() {   			mainThreadCheck.run();		}	}}

一系列的启动,是由前面提到过的StartService中启动的,如果不太清楚可以回去翻一翻。启动endpoint就意味着服务端启动了。

2、客户端

有了服务端自然要有客户端,代码如下:

class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer {   	private static final Logger LOG = LoggerFactory.getLogger(AkkaInvocationHandler.class);	/**	 * The Akka (RPC) address of {@link #rpcEndpoint} including host and port of the ActorSystem in	 * which the actor is running.	 */	private final String address;	/**	 * Hostname of the host, {@link #rpcEndpoint} is running on.	 */	private final String hostname;	private final ActorRef rpcEndpoint;	// whether the actor ref is local and thus no message serialization is needed	protected final boolean isLocal;	// default timeout for asks	private final Time timeout;	private final long maximumFramesize;	// null if gateway; otherwise non-null	@Nullable	private final CompletableFuture<Void> terminationFuture;	AkkaInvocationHandler(			String address,			String hostname,			ActorRef rpcEndpoint,			Time timeout,			long maximumFramesize,			@Nullable CompletableFuture<Void> terminationFuture) {   		this.address = Preconditions.checkNotNull(address);		this.hostname = Preconditions.checkNotNull(hostname);		this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);		this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();		this.timeout = Preconditions.checkNotNull(timeout);		this.maximumFramesize = maximumFramesize;		this.terminationFuture = terminationFuture;	}	@Override	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {   		Class<?> declaringClass = method.getDeclaringClass();		Object result;		if (declaringClass.equals(AkkaBasedEndpoint.class) ||			declaringClass.equals(Object.class) ||			declaringClass.equals(RpcGateway.class) ||			declaringClass.equals(StartStoppable.class) ||			declaringClass.equals(MainThreadExecutable.class) ||			declaringClass.equals(RpcServer.class)) {   			result = method.invoke(this, args);		} else if (declaringClass.equals(FencedRpcGateway.class)) {   			throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +				method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +				"fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +				"retrieve a properly FencedRpcGateway.");		} else {   			result = invokeRpc(method, args);		}		return result;	}	@Override	public ActorRef getActorRef() {   		return rpcEndpoint;	}	@Override	public void runAsync(Runnable runnable) {   		scheduleRunAsync(runnable, 0L);	}	@Override	public void scheduleRunAsync(Runnable runnable, long delayMillis) {   		checkNotNull(runnable, "runnable");		checkArgument(delayMillis >= 0, "delay must be zero or greater");		if (isLocal) {   			long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);			tell(new RunAsync(runnable, atTimeNanos));		} else {   			throw new RuntimeException("Trying to send a Runnable to a remote actor at " +				rpcEndpoint.path() + ". This is not supported.");		}	}	@Override	public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {   		if (isLocal) {   			@SuppressWarnings("unchecked")			CompletableFuture<V> resultFuture = (CompletableFuture<V>) ask(new CallAsync(callable), callTimeout);			return resultFuture;		} else {   			throw new RuntimeException("Trying to send a Callable to a remote actor at " +				rpcEndpoint.path() + ". This is not supported.");		}	}	@Override	public void start() {   		rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());	}	@Override	public void stop() {   		rpcEndpoint.tell(ControlMessages.STOP, ActorRef.noSender());	}	// ------------------------------------------------------------------------	//  Private methods	// ------------------------------------------------------------------------	/**	 * Invokes a RPC method by sending the RPC invocation details to the rpc endpoint.	 *	 * @param method to call	 * @param args of the method call	 * @return result of the RPC	 * @throws Exception if the RPC invocation fails	 */	private Object invokeRpc(Method method, Object[] args) throws Exception {   		String methodName = method.getName();		Class<?>[] parameterTypes = method.getParameterTypes();		Annotation[][] parameterAnnotations = method.getParameterAnnotations();		Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);		final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args);		Class<?> returnType = method.getReturnType();		final Object result;		if (Objects.equals(returnType, Void.TYPE)) {   			tell(rpcInvocation);			result = null;		} else {   			// execute an asynchronous call			CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);			CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {   				if (o instanceof SerializedValue) {   					try {   						return  ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());					} catch (IOException | ClassNotFoundException e) {   						throw new CompletionException(							new RpcException("Could not deserialize the serialized payload of RPC method : "								+ methodName, e));					}				} else {   					return o;				}			});			if (Objects.equals(returnType, CompletableFuture.class)) {   				result = completableFuture;			} else {   				try {   					result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());				} catch (ExecutionException ee) {   					throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(ee));				}			}		}		return result;	}	/**	 * Create the RpcInvocation message for the given RPC.	 *	 * @param methodName of the RPC	 * @param parameterTypes of the RPC	 * @param args of the RPC	 * @return RpcInvocation message which encapsulates the RPC details	 * @throws IOException if we cannot serialize the RPC invocation parameters	 */	protected RpcInvocation createRpcInvocationMessage(			final String methodName,			final Class<?>[] parameterTypes,			final Object[] args) throws IOException {   		final RpcInvocation rpcInvocation;		if (isLocal) {   			rpcInvocation = new LocalRpcInvocation(				methodName,				parameterTypes,				args);		} else {   			try {   				RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(					methodName,					parameterTypes,					args);				if (remoteRpcInvocation.getSize() > maximumFramesize) {   					throw new IOException("The rpc invocation size exceeds the maximum akka framesize.");				} else {   					rpcInvocation = remoteRpcInvocation;				}			} catch (IOException e) {   				LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", e);				throw e;			}		}		return rpcInvocation;	}	// ------------------------------------------------------------------------	//  Helper methods	// ------------------------------------------------------------------------	/**	 * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method	 * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default	 * timeout is returned.	 *	 * @param parameterAnnotations Parameter annotations	 * @param args Array of arguments	 * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter	 *                       has been found	 * @return Timeout extracted from the array of arguments or the default timeout	 */	private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Time defaultTimeout) {   		if (args != null) {   			Preconditions.checkArgument(parameterAnnotations.length == args.length);			for (int i = 0; i < parameterAnnotations.length; i++) {   				if (isRpcTimeout(parameterAnnotations[i])) {   					if (args[i] instanceof Time) {   						return (Time) args[i];					} else {   						throw new RuntimeException("The rpc timeout parameter must be of type " +							Time.class.getName() + ". The type " + args[i].getClass().getName() +							" is not supported.");					}				}			}		}		return defaultTimeout;	}	/**	 * Checks whether any of the annotations is of type {@link RpcTimeout}.	 *	 * @param annotations Array of annotations	 * @return True if {@link RpcTimeout} was found; otherwise false	 */	private static boolean isRpcTimeout(Annotation[] annotations) {   		for (Annotation annotation : annotations) {   			if (annotation.annotationType().equals(RpcTimeout.class)) {   				return true;			}		}		return false;	}	/**	 * Sends the message to the RPC endpoint.	 *	 * @param message to send to the RPC endpoint.	 */	protected void tell(Object message) {   		rpcEndpoint.tell(message, ActorRef.noSender());	}	/**	 * Sends the message to the RPC endpoint and returns a future containing	 * its response.	 *	 * @param message to send to the RPC endpoint	 * @param timeout time to wait until the response future is failed with a {@link TimeoutException}	 * @return Response future	 */	protected CompletableFuture<?> ask(Object message, Time timeout) {   		return FutureUtils.toJava(			Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()));	}	@Override	public String getAddress() {   		return address;	}	@Override	public String getHostname() {   		return hostname;	}	@Override	public CompletableFuture<Void> getTerminationFuture() {   		return terminationFuture;	}}

这个客户继承了三个类或者说接口。InvocationHandler, AkkaBasedEndpoint, RpcServer,分别提供了操作,节点控制和相关通信传输协议。在获得客户端后就可以通过相关的接口来发送消息了。

3.Rpc的通信

客户端继承了InvocationHandler:

public interface InvocationHandler {       public Object invoke(Object proxy, Method method, Object[] args)        throws Throwable;}

这个接口只有一个函数,invoke,如果编程时间稍微长一些的,都会明白类似这种名字的函数,基本都是调用相关接口函数的。在前面的RpcInvocation中会有LocalRpcInvocation和RemoteRpcInvocation的选择性生成,本地的调用本地的,远端的调用远端的。通过这二者对RPC方法的封装,在调用invokeRpc这个函数时,即可通过类型判断来决定调用tell还是ask的任一个。而这个类型的判断是通过二者是否在同一个JVM中,这个在上面的invokeRpc函数中有所体现,下面是删减后的代码:

private Object invokeRpc(Method method, Object[] args) throws Exception {   ......  Class<?> returnType = method.getReturnType();  final Object result;  if (Objects.equals(returnType, Void.TYPE)) {       tell(rpcInvocation);    result = null;  } else {       // execute an asynchronous call    CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);......    });......  return result;}protected void tell(Object message) {     rpcEndpoint.tell(message, ActorRef.noSender());}protected CompletableFuture<?> ask(Object message, Time timeout) {     return FutureUtils.toJava(    Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()));}

再向下,就开始调用Akka的相关Rpc操作了。在前面的StartServer函数中:

final Props akkaRpcActorProps;if (rpcEndpoint instanceof FencedRpcEndpoint) {     akkaRpcActorProps = Props.create(    FencedAkkaRpcActor.class,    rpcEndpoint,    terminationFuture,    getVersion(),    configuration.getMaximumFramesize());} else {     akkaRpcActorProps = Props.create(    AkkaRpcActor.class,    rpcEndpoint,    terminationFuture,    getVersion(),    configuration.getMaximumFramesize());}ActorRef actorRef;synchronized (lock) {     checkState(!stopped, "RpcService is stopped");  actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());  actors.put(actorRef, rpcEndpoint);}

可以跳到tell函数中,发现就是调用的actor包中的ActorRef来实现的。

三、总结

这里分析还是比较粗糙的,并发模型中有两个一个是CSP(典型的是GO),另外一个是这Actor,现在这个用的就是这个。为什么没有详细的分析RpcServer和client的实现,主要原因是这个调用确实简单,而且前面虽然不是系统的但也把重点已经分析过了(比如JobMaster、Dispatcher、TaskExecutor等,这些都RpcEndpoint的子类),这里再说一次也没有什么必要。包括序列化部分,其实也有些纠结是不是分析。但是这其实又是另外一个部分,到时候儿再说。

上一篇:cmake对编译器版本和库依赖的问题
下一篇:Lucet的使用方法

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2025年03月27日 00时39分05秒