
本文共 30358 字,大约阅读时间需要 101 分钟。
一、介绍
Flink是分布式大数据处理框架,那么网络通信就离不开了,从目前来看,几乎所有的开源的大型软件,尤其是Java的,基本已经不再倾向于自己写底层网络通信,毕竟有很多可以使用的Rpc网络通信框架可以来完善使用,Flink也是如此,它是基本Akka Rpc这款Rpc通信框架的。
二、分析
1、服务端
先看一下测试代码的最基础的通信代码(RpcEndPointTest.java):
public interface BaseGateway extends RpcGateway { CompletableFuture<Integer> foobar();}//基础的通信协议接口public interface ExtendedGateway extends BaseGateway { CompletableFuture<Integer> barfoo();}public interface DifferentGateway extends RpcGateway { CompletableFuture<String> foo();}public static class BaseEndpoint extends RpcEndpoint implements BaseGateway { private final int foobarValue; protected BaseEndpoint(RpcService rpcService, int foobarValue) { super(rpcService); this.foobarValue = foobarValue; } @Override public CompletableFuture<Integer> foobar() { return CompletableFuture.completedFuture(foobarValue); }}
在Flink中,Rpc的网关接口非常简单,代码如下:
public interface RpcGateway { /** * Returns the fully qualified address under which the associated rpc endpoint is reachable. * * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable */ String getAddress(); /** * Returns the fully qualified hostname under which the associated rpc endpoint is reachable. * * @return Fully qualified hostname under which the associated rpc endpoint is reachable */ String getHostname();}
这两方法太简单了,不解释。在Akka的服务中要获得相关的Actor,地址是必须的,而在网络通信中既可以直接获取地址,又可以通过HostName来获得。在flink中,网络的服务端是由RpcService这个接口来控制:
public interface RpcService { /** * Return the hostname or host address under which the rpc service can be reached. * If the rpc service cannot be contacted remotely, then it will return an empty string. * * @return Address of the rpc service or empty string if local rpc service */ String getAddress(); /** * Return the port under which the rpc service is reachable. If the rpc service cannot be * contacted remotely, then it will return -1. * * @return Port of the rpc service or -1 if local rpc service */ int getPort(); /** * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can * be used to communicate with the rpc server. If the connection failed, then the returned * future is failed with a {@link RpcConnectionException}. * * @param address Address of the remote rpc server * @param clazz Class of the rpc gateway to return * @param <C> Type of the rpc gateway to return * @return Future containing the rpc gateway or an {@link RpcConnectionException} if the * connection attempt failed */ <C extends RpcGateway> CompletableFuture<C> connect( String address, Class<C> clazz); /** * Connect to a remote fenced rpc server under the provided address. Returns a fenced rpc gateway * which can be used to communicate with the rpc server. If the connection failed, then the * returned future is failed with a {@link RpcConnectionException}. * * @param address Address of the remote rpc server * @param fencingToken Fencing token to be used when communicating with the server * @param clazz Class of the rpc gateway to return * @param <F> Type of the fencing token * @param <C> Type of the rpc gateway to return * @return Future containing the fenced rpc gateway or an {@link RpcConnectionException} if the * connection attempt failed */ <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect( String address, F fencingToken, Class<C> clazz); /** * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint. * * @param rpcEndpoint Rpc protocol to dispatch the rpcs to * @param <C> Type of the rpc endpoint * @return Self gateway to dispatch remote procedure calls to oneself */ <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint); /** * Fence the given RpcServer with the given fencing token. * * <p>Fencing the RpcServer means that we fix the fencing token to the provided value. * All RPCs will then be enriched with this fencing token. This expects that the receiving * RPC endpoint extends {@link FencedRpcEndpoint}. * * @param rpcServer to fence with the given fencing token * @param fencingToken to fence the RpcServer with * @param <F> type of the fencing token * @return Fenced RpcServer */ <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken); /** * Stop the underlying rpc server of the provided self gateway. * * @param selfGateway Self gateway describing the underlying rpc server */ void stopServer(RpcServer selfGateway); /** * Trigger the asynchronous stopping of the {@link RpcService}. * * @return Future which is completed once the {@link RpcService} has been * fully stopped. */ CompletableFuture<Void> stopService(); /** * Returns a future indicating when the RPC service has been shut down. * * @return Termination future */ CompletableFuture<Void> getTerminationFuture(); /** * Gets the executor, provided by this RPC service. This executor can be used for example for * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures. * * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against * any concurrent invocations and is therefore not suitable to run completion methods of futures * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that * {@code RpcEndpoint}. * * @return The execution context provided by the RPC service */ Executor getExecutor(); /** * Gets a scheduled executor from the RPC service. This executor can be used to schedule * tasks to be executed in the future. * * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against * any concurrent invocations and is therefore not suitable to run completion methods of futures * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that * {@code RpcEndpoint}. * * @return The RPC service provided scheduled executor */ ScheduledExecutor getScheduledExecutor(); /** * Execute the runnable in the execution context of this RPC Service, as returned by * {@link #getExecutor()}, after a scheduled delay. * * @param runnable Runnable to be executed * @param delay The delay after which the runnable will be executed */ ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit); /** * Execute the given runnable in the executor of the RPC service. This method can be used to run * code outside of the main thread of a {@link RpcEndpoint}. * * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against * any concurrent invocations and is therefore not suitable to run completion methods of futures * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that * {@code RpcEndpoint}. * * @param runnable to execute */ void execute(Runnable runnable); /** * Execute the given callable and return its result as a {@link CompletableFuture}. This method can be used * to run code outside of the main thread of a {@link RpcEndpoint}. * * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against * any concurrent invocations and is therefore not suitable to run completion methods of futures * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that * {@code RpcEndpoint}. * * @param callable to execute * @param <T> is the return value type * @return Future containing the callable's future result */ <T> CompletableFuture<T> execute(Callable<T> callable);}
由他来控制着相关的服务端节点的启动,即RpcEndPoint:
public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { protected final Logger log = LoggerFactory.getLogger(getClass()); // ------------------------------------------------------------------------ /** RPC service to be used to start the RPC server and to obtain rpc gateways. */ private final RpcService rpcService; /** Unique identifier for this rpc endpoint. */ private final String endpointId; /** Interface to access the underlying rpc server. */ protected final RpcServer rpcServer; /** A reference to the endpoint's main thread, if the current method is called by the main thread. */ final AtomicReference<Thread> currentMainThread = new AtomicReference<>(null); /** The main thread executor to be used to execute future callbacks in the main thread * of the executing rpc server. */ private final MainThreadExecutor mainThreadExecutor; /** * Initializes the RPC endpoint. * * @param rpcService The RPC server that dispatches calls to this RPC endpoint. * @param endpointId Unique identifier for this endpoint */ protected RpcEndpoint(final RpcService rpcService, final String endpointId) { this.rpcService = checkNotNull(rpcService, "rpcService"); this.endpointId = checkNotNull(endpointId, "endpointId"); this.rpcServer = rpcService.startServer(this); this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread); } /** * Initializes the RPC endpoint with a random endpoint id. * * @param rpcService The RPC server that dispatches calls to this RPC endpoint. */ protected RpcEndpoint(final RpcService rpcService) { this(rpcService, UUID.randomUUID().toString()); } /** * Returns the rpc endpoint's identifier. * * @return Rpc endpoint's identifier. */ public String getEndpointId() { return endpointId; } // ------------------------------------------------------------------------ // Start & shutdown & lifecycle callbacks // ------------------------------------------------------------------------ /** * Starts the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready * to process remote procedure calls. * * @throws Exception indicating that something went wrong while starting the RPC endpoint */ public final void start() { rpcServer.start(); } /** * User overridable callback. * * <p>This method is called when the RpcEndpoint is being started. The method is guaranteed * to be executed in the main thread context and can be used to start the rpc endpoint in the * context of the rpc endpoint's main thread. * * <p>IMPORTANT: This method should never be called directly by the user. * @throws Exception indicating that the rpc endpoint could not be started. If an exception occurs, * then the rpc endpoint will automatically terminate. */ public void onStart() throws Exception { } /** * Stops the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is * no longer ready to process remote procedure calls. */ protected final void stop() { rpcServer.stop(); } /** * User overridable callback. * * <p>This method is called when the RpcEndpoint is being shut down. The method is guaranteed * to be executed in the main thread context and can be used to clean up internal state. * * <p>IMPORTANT: This method should never be called directly by the user. * * @return Future which is completed once all post stop actions are completed. If an error * occurs this future is completed exceptionally */ public CompletableFuture<Void> onStop() { return CompletableFuture.completedFuture(null); } /** * Triggers the shut down of the rpc endpoint. The shut down is executed asynchronously. * * <p>In order to wait on the completion of the shut down, obtain the termination future * via {@link #getTerminationFuture()}} and wait on its completion. */ @Override public final CompletableFuture<Void> closeAsync() { rpcService.stopServer(rpcServer); return getTerminationFuture(); } // ------------------------------------------------------------------------ // Basic RPC endpoint properties // ------------------------------------------------------------------------ /** * Returns a self gateway of the specified type which can be used to issue asynchronous * calls against the RpcEndpoint. * * <p>IMPORTANT: The self gateway type must be implemented by the RpcEndpoint. Otherwise * the method will fail. * * @param selfGatewayType class of the self gateway type * @param <C> type of the self gateway to create * @return Self gateway of the specified type which can be used to issue asynchronous rpcs */ public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType) { if (selfGatewayType.isInstance(rpcServer)) { @SuppressWarnings("unchecked") C selfGateway = ((C) rpcServer); return selfGateway; } else { throw new RuntimeException("RpcEndpoint does not implement the RpcGateway interface of type " + selfGatewayType + '.'); } } /** * Gets the address of the underlying RPC endpoint. The address should be fully qualified so that * a remote system can connect to this RPC endpoint via this address. * * @return Fully qualified address of the underlying RPC endpoint */ @Override public String getAddress() { return rpcServer.getAddress(); } /** * Gets the hostname of the underlying RPC endpoint. * * @return Hostname on which the RPC endpoint is running */ @Override public String getHostname() { return rpcServer.getHostname(); } /** * Gets the main thread execution context. The main thread execution context can be used to * execute tasks in the main thread of the underlying RPC endpoint. * * @return Main thread execution context */ protected MainThreadExecutor getMainThreadExecutor() { return mainThreadExecutor; } /** * Gets the endpoint's RPC service. * * @return The endpoint's RPC service */ public RpcService getRpcService() { return rpcService; } /** * Return a future which is completed with true when the rpc endpoint has been terminated. * In case of a failure, this future is completed with the occurring exception. * * @return Future which is completed when the rpc endpoint has been terminated. */ public CompletableFuture<Void> getTerminationFuture() { return rpcServer.getTerminationFuture(); } // ------------------------------------------------------------------------ // Asynchronous executions // ------------------------------------------------------------------------ /** * Execute the runnable in the main thread of the underlying RPC endpoint. * * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint */ protected void runAsync(Runnable runnable) { rpcServer.runAsync(runnable); } /** * Execute the runnable in the main thread of the underlying RPC endpoint, with * a delay of the given number of milliseconds. * * @param runnable Runnable to be executed * @param delay The delay after which the runnable will be executed */ protected void scheduleRunAsync(Runnable runnable, Time delay) { scheduleRunAsync(runnable, delay.getSize(), delay.getUnit()); } /** * Execute the runnable in the main thread of the underlying RPC endpoint, with * a delay of the given number of milliseconds. * * @param runnable Runnable to be executed * @param delay The delay after which the runnable will be executed */ protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay)); } /** * Execute the callable in the main thread of the underlying RPC service, returning a future for * the result of the callable. If the callable is not completed within the given timeout, then * the future will be failed with a {@link TimeoutException}. * * @param callable Callable to be executed in the main thread of the underlying rpc server * @param timeout Timeout for the callable to be completed * @param <V> Return type of the callable * @return Future for the result of the callable. */ protected <V> CompletableFuture<V> callAsync(Callable<V> callable, Time timeout) { return rpcServer.callAsync(callable, timeout); } // ------------------------------------------------------------------------ // Main Thread Validation // ------------------------------------------------------------------------ /** * Validates that the method call happens in the RPC endpoint's main thread. * * <p><b>IMPORTANT:</b> This check only happens when assertions are enabled, * such as when running tests. * * <p>This can be used for additional checks, like * <pre>{@code * protected void concurrencyCriticalMethod() { * validateRunsInMainThread(); * * // some critical stuff * } * }</pre> */ public void validateRunsInMainThread() { assert MainThreadValidatorUtil.isRunningInExpectedThread(currentMainThread.get()); } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ /** * Executor which executes runnables in the main thread context. */ protected static class MainThreadExecutor implements ComponentMainThreadExecutor { private final MainThreadExecutable gateway; private final Runnable mainThreadCheck; MainThreadExecutor(MainThreadExecutable gateway, Runnable mainThreadCheck) { this.gateway = Preconditions.checkNotNull(gateway); this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck); } public void runAsync(Runnable runnable) { gateway.runAsync(runnable); } public void scheduleRunAsync(Runnable runnable, long delayMillis) { gateway.scheduleRunAsync(runnable, delayMillis); } public void execute(@Nonnull Runnable command) { runAsync(command); } @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit); FutureTask<Void> ft = new FutureTask<>(command, null); scheduleRunAsync(ft, delayMillis); return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS); } @Override public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { throw new UnsupportedOperationException("Not implemented because the method is currently not required."); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException("Not implemented because the method is currently not required."); } @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { throw new UnsupportedOperationException("Not implemented because the method is currently not required."); } @Override public void assertRunningInMainThread() { mainThreadCheck.run(); } }}
一系列的启动,是由前面提到过的StartService中启动的,如果不太清楚可以回去翻一翻。启动endpoint就意味着服务端启动了。
2、客户端
有了服务端自然要有客户端,代码如下:
class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer { private static final Logger LOG = LoggerFactory.getLogger(AkkaInvocationHandler.class); /** * The Akka (RPC) address of {@link #rpcEndpoint} including host and port of the ActorSystem in * which the actor is running. */ private final String address; /** * Hostname of the host, {@link #rpcEndpoint} is running on. */ private final String hostname; private final ActorRef rpcEndpoint; // whether the actor ref is local and thus no message serialization is needed protected final boolean isLocal; // default timeout for asks private final Time timeout; private final long maximumFramesize; // null if gateway; otherwise non-null @Nullable private final CompletableFuture<Void> terminationFuture; AkkaInvocationHandler( String address, String hostname, ActorRef rpcEndpoint, Time timeout, long maximumFramesize, @Nullable CompletableFuture<Void> terminationFuture) { this.address = Preconditions.checkNotNull(address); this.hostname = Preconditions.checkNotNull(hostname); this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint); this.isLocal = this.rpcEndpoint.path().address().hasLocalScope(); this.timeout = Preconditions.checkNotNull(timeout); this.maximumFramesize = maximumFramesize; this.terminationFuture = terminationFuture; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass = method.getDeclaringClass(); Object result; if (declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) { result = method.invoke(this, args); } else if (declaringClass.equals(FencedRpcGateway.class)) { throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" + method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " + "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " + "retrieve a properly FencedRpcGateway."); } else { result = invokeRpc(method, args); } return result; } @Override public ActorRef getActorRef() { return rpcEndpoint; } @Override public void runAsync(Runnable runnable) { scheduleRunAsync(runnable, 0L); } @Override public void scheduleRunAsync(Runnable runnable, long delayMillis) { checkNotNull(runnable, "runnable"); checkArgument(delayMillis >= 0, "delay must be zero or greater"); if (isLocal) { long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000); tell(new RunAsync(runnable, atTimeNanos)); } else { throw new RuntimeException("Trying to send a Runnable to a remote actor at " + rpcEndpoint.path() + ". This is not supported."); } } @Override public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) { if (isLocal) { @SuppressWarnings("unchecked") CompletableFuture<V> resultFuture = (CompletableFuture<V>) ask(new CallAsync(callable), callTimeout); return resultFuture; } else { throw new RuntimeException("Trying to send a Callable to a remote actor at " + rpcEndpoint.path() + ". This is not supported."); } } @Override public void start() { rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender()); } @Override public void stop() { rpcEndpoint.tell(ControlMessages.STOP, ActorRef.noSender()); } // ------------------------------------------------------------------------ // Private methods // ------------------------------------------------------------------------ /** * Invokes a RPC method by sending the RPC invocation details to the rpc endpoint. * * @param method to call * @param args of the method call * @return result of the RPC * @throws Exception if the RPC invocation fails */ private Object invokeRpc(Method method, Object[] args) throws Exception { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); Annotation[][] parameterAnnotations = method.getParameterAnnotations(); Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args); Class<?> returnType = method.getReturnType(); final Object result; if (Objects.equals(returnType, Void.TYPE)) { tell(rpcInvocation); result = null; } else { // execute an asynchronous call CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout); CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> { if (o instanceof SerializedValue) { try { return ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader()); } catch (IOException | ClassNotFoundException e) { throw new CompletionException( new RpcException("Could not deserialize the serialized payload of RPC method : " + methodName, e)); } } else { return o; } }); if (Objects.equals(returnType, CompletableFuture.class)) { result = completableFuture; } else { try { result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit()); } catch (ExecutionException ee) { throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(ee)); } } } return result; } /** * Create the RpcInvocation message for the given RPC. * * @param methodName of the RPC * @param parameterTypes of the RPC * @param args of the RPC * @return RpcInvocation message which encapsulates the RPC details * @throws IOException if we cannot serialize the RPC invocation parameters */ protected RpcInvocation createRpcInvocationMessage( final String methodName, final Class<?>[] parameterTypes, final Object[] args) throws IOException { final RpcInvocation rpcInvocation; if (isLocal) { rpcInvocation = new LocalRpcInvocation( methodName, parameterTypes, args); } else { try { RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation( methodName, parameterTypes, args); if (remoteRpcInvocation.getSize() > maximumFramesize) { throw new IOException("The rpc invocation size exceeds the maximum akka framesize."); } else { rpcInvocation = remoteRpcInvocation; } } catch (IOException e) { LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", e); throw e; } } return rpcInvocation; } // ------------------------------------------------------------------------ // Helper methods // ------------------------------------------------------------------------ /** * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default * timeout is returned. * * @param parameterAnnotations Parameter annotations * @param args Array of arguments * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter * has been found * @return Timeout extracted from the array of arguments or the default timeout */ private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Time defaultTimeout) { if (args != null) { Preconditions.checkArgument(parameterAnnotations.length == args.length); for (int i = 0; i < parameterAnnotations.length; i++) { if (isRpcTimeout(parameterAnnotations[i])) { if (args[i] instanceof Time) { return (Time) args[i]; } else { throw new RuntimeException("The rpc timeout parameter must be of type " + Time.class.getName() + ". The type " + args[i].getClass().getName() + " is not supported."); } } } } return defaultTimeout; } /** * Checks whether any of the annotations is of type {@link RpcTimeout}. * * @param annotations Array of annotations * @return True if {@link RpcTimeout} was found; otherwise false */ private static boolean isRpcTimeout(Annotation[] annotations) { for (Annotation annotation : annotations) { if (annotation.annotationType().equals(RpcTimeout.class)) { return true; } } return false; } /** * Sends the message to the RPC endpoint. * * @param message to send to the RPC endpoint. */ protected void tell(Object message) { rpcEndpoint.tell(message, ActorRef.noSender()); } /** * Sends the message to the RPC endpoint and returns a future containing * its response. * * @param message to send to the RPC endpoint * @param timeout time to wait until the response future is failed with a {@link TimeoutException} * @return Response future */ protected CompletableFuture<?> ask(Object message, Time timeout) { return FutureUtils.toJava( Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds())); } @Override public String getAddress() { return address; } @Override public String getHostname() { return hostname; } @Override public CompletableFuture<Void> getTerminationFuture() { return terminationFuture; }}
这个客户继承了三个类或者说接口。InvocationHandler, AkkaBasedEndpoint, RpcServer,分别提供了操作,节点控制和相关通信传输协议。在获得客户端后就可以通过相关的接口来发送消息了。
3.Rpc的通信
客户端继承了InvocationHandler:
public interface InvocationHandler { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable;}
这个接口只有一个函数,invoke,如果编程时间稍微长一些的,都会明白类似这种名字的函数,基本都是调用相关接口函数的。在前面的RpcInvocation中会有LocalRpcInvocation和RemoteRpcInvocation的选择性生成,本地的调用本地的,远端的调用远端的。通过这二者对RPC方法的封装,在调用invokeRpc这个函数时,即可通过类型判断来决定调用tell还是ask的任一个。而这个类型的判断是通过二者是否在同一个JVM中,这个在上面的invokeRpc函数中有所体现,下面是删减后的代码:
private Object invokeRpc(Method method, Object[] args) throws Exception { ...... Class<?> returnType = method.getReturnType(); final Object result; if (Objects.equals(returnType, Void.TYPE)) { tell(rpcInvocation); result = null; } else { // execute an asynchronous call CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);...... });...... return result;}protected void tell(Object message) { rpcEndpoint.tell(message, ActorRef.noSender());}protected CompletableFuture<?> ask(Object message, Time timeout) { return FutureUtils.toJava( Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()));}
再向下,就开始调用Akka的相关Rpc操作了。在前面的StartServer函数中:
final Props akkaRpcActorProps;if (rpcEndpoint instanceof FencedRpcEndpoint) { akkaRpcActorProps = Props.create( FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumFramesize());} else { akkaRpcActorProps = Props.create( AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumFramesize());}ActorRef actorRef;synchronized (lock) { checkState(!stopped, "RpcService is stopped"); actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId()); actors.put(actorRef, rpcEndpoint);}
可以跳到tell函数中,发现就是调用的actor包中的ActorRef来实现的。
三、总结
这里分析还是比较粗糙的,并发模型中有两个一个是CSP(典型的是GO),另外一个是这Actor,现在这个用的就是这个。为什么没有详细的分析RpcServer和client的实现,主要原因是这个调用确实简单,而且前面虽然不是系统的但也把重点已经分析过了(比如JobMaster、Dispatcher、TaskExecutor等,这些都RpcEndpoint的子类),这里再说一次也没有什么必要。包括序列化部分,其实也有些纠结是不是分析。但是这其实又是另外一个部分,到时候儿再说。
发表评论
最新留言
关于作者
