dubbo RCP请求到达provider后,首先经过数据接收、解码(NettyWokerThread/NioEventLoop),然后传递到RPC后续流程(DubboServerHandler),即filter、service invoke过程,service invoke过程也就是执行真正服务的逻辑,执行完毕后再经过编码作为响应返回给RPC调用者。
在分析之前,首先来看一下整体的RPC流程:
处理流程 NettyServerHandler
是dubbo在netty设置的ChannelHandle
r,NettyServerHandler
的处理逻辑是将请求message提交给DubboServerHandler
线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void received (Channel channel, Object message) throws RemotingException { if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage) message; for (Object obj : list) { handler.received(channel, obj); } } else { handler.received(channel, message); } } public void received (Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); }
ChannelEventRunnable 事件的处理逻辑是org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
,事件类型包括连接、断开、发送、异常等:
1 2 3 4 5 6 7 public enum ChannelState { CONNECTED, DISCONNECTED, SENT, RECEIVED, CAUGHT }
RPC执行 下面是调用HelloServiceImpl的一个调用栈信息:
Filter执行 进入DubboServerHandler线程池的流程首先就是dubbo中各个filter,EchoFilter、ClassLoaderFilter、ContextFilter、TraceFilter、TimeoutFilter、MonitorFilter
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public Result invoke (Invoker<?> invoker, Invocation inv) throws RpcException { if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1 ) { return new RpcResult(inv.getArguments()[0 ]); } return invoker.invoke(inv); } public Result invoke (Invoker<?> invoker, Invocation invocation) throws RpcException { ClassLoader ocl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader()); try { return invoker.invoke(invocation); } finally { Thread.currentThread().setContextClassLoader(ocl); } } public Result invoke (Invoker<?> invoker, Invocation inv) throws RpcException { return invoker.invoke(inv); } public Result invoke (Invoker<?> invoker, Invocation invocation) throws RpcException { Map<String, String> attachments = invocation.getAttachments(); RpcContext.getContext() .setInvoker(invoker) .setInvocation(invocation) .setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); if (attachments != null ) { if (RpcContext.getContext().getAttachments() != null ) { RpcContext.getContext().getAttachments().putAll(attachments); } else { RpcContext.getContext().setAttachments(attachments); } } if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(invoker); } try { return invoker.invoke(invocation); } finally { RpcContext.removeContext(); RpcContext.removeServerContext(); } }
Filter的包装类如下所示,它会将各个filter依次串起来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 new Invoker<T>() { @Override public Class<T> getInterface () { return invoker.getInterface(); } @Override public URL getUrl () { return invoker.getUrl(); } @Override public boolean isAvailable () { return invoker.isAvailable(); } @Override public Result invoke (Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy () { invoker.destroy(); } @Override public String toString () { return invoker.toString(); } };
RPC反射调用 RPC反射调用流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public Result invoke (Invocation invocation) throws RpcException { try { Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture<Object> future = wrapWithFuture(value, invocation); CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> { AppResponse result = new AppResponse(); if (t != null ) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } return result; }); return new AsyncRpcResult(appResponseFuture, invocation); } catch (InvocationTargetException e) { } catch (Throwable e) { } } public <T> Invoker<T> getInvoker (T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$' ) < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke (T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } public class DemoServiceImpl implements DemoService { @Override public String sayHello (String name) { System.out.println("[" + new SimpleDateFormat("HH:mm:ss" ).format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress()); return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress(); } }
通过反射执行,就到达了业务处理逻辑,获取到返回结果之后,会组装 AppResponse 结果返回给客户端。
Filter链初始化 Filter链的初始化是在初始化线程模型之前就已完成的:
1 2 3 4 5 6 7 8 9 10 11 12 public NettyServer (URL url, ChannelHandler handler) throws RemotingException { super (ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); } public static ChannelHandler wrap (ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal (ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class ) .getAdaptiveExtension ().dispatch (handler , url ))) ;}
从上述代码来看,handler已经在初始化NettyServer时构建完成了,继续往上跟,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public RemotingServer bind (URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } public ExchangeServer bind (URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } private ProtocolServer createServer (URL url) { url = URLBuilder.from(url) .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); ExchangeServer server = Exchangers.bind(url, requestHandler); return new DubboProtocolServer(server); }
requestHandler对应的是org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler,其实现了ChannelHandler接口(可处理各种事件),针对请求来说,对应代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public CompletableFuture<Object> reply (ExchangeChannel channel, Object message) throws RemotingException { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); } Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { boolean isStubServiceInvoke = false ; int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(PATH_KEY); String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY)); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); return exporter.getInvoker(); }
也就是说,处理链的获取是从provider端服务列表-处理链 仓库中获取的,该仓库是在dubbo provider启动流程中初始化的,关于这块不是本文的分析重点,因此不再赘述。