Dubbo框架的一些特性
分层架构--Dubbo采用分层架构。Dubbo框架整体上氛围了业务层(business)、RPC层和远程调用层(Remoting),其中业务层提供API,让使用者方便地发布与引用服务;RPC层则是对服务注册与发现、服务代理、路由、负载均衡等功能的封装,该层有可以被划分为很多层;远程调用层则是对网络传输与请求数据序列/反序列化等的抽象。
扩展性--Dubbo是一个扩展性极强的框架,其RPC层中的所有组件都是基于SPI扩展接口实现的,每个组件都可以被替换。
集群容错--Dubbo框架提供了分布式系统中常见的集群容错策略,并且提供了扩展接口,让使用方方便地定制自己的集群容错策略。
负载均衡、路由--Dubbo框架提供了分布式系统中常见的负载均衡策略,并且提供了扩展接口,让使用者方便地指定自己的负载均衡策略;另外,陆游规则提供了服务治理的一种策略,在Dubbo中我们可以通过管理控制台来配置路由规则,让消费者只可以访问那些服务提供者。
注册中心--在Dubbo框架中,提供了扩展接口来方便地让我们使用Zookeeper,redis等作为服务注册中心。
代理远程调用--Dubbo框架和其他RPC框架一样,采用代理来实现,让使用者无感知地发起远程过程调用。服务提供端使用代理与JavaAssist技术来减少反射调用开销。
网络通信--Dubbo的分层架构中,Transport网络传输层把Mina和Netty抽象为统一接口,并且在默认情况下使用Netty作为底层网络通信。
异步调用--Dubbo基于Netty的异步非阻塞能力和JDK8中的CompletableFuture轻松地实现RPC请求的异步调用,提供了资源的利用率。
全书内容
第一部分为基础篇,首先从整体上讲解使用Dubbo搭建的系统由哪些模块组成,各模块相互之间的调用关系是怎么样的,然后基于本书的Demo讲解如何使用Dubbo。
第二部分为高级篇,主要讲解Dubbo框架内部的实现原理,包含支撑Dubbo框架的适配器类原理、动态编译原理、增强SPI原理、消费的泛化调用实现原理、消费端异步调用与服务提供端的异步执行、Dubbo框架的线程模型、消费端负载均衡策略、消费端集群容错策略、并发控制原理、Dubbo网络协议等。
第三部分为实践篇,主要探讨如何使用Arthas和一些Demo来为研究Dubbo框架原理提供方便,并且讲解如何基于CompletableFuture和Netty模拟RPC同步与异步调用。
第一章Dubbo基础
Dubbo是阿里巴巴开发的一个开源的高性能的RPC调用框架,是一个致力于提供高性能和透明化的RPC远程调用服务解决方案。作为阿里巴巴SOA服务化治理方案的核心框架,目前它已从Apache孵化器项目毕业,其前景可谓无限光明。
Provider :提供者,服务发布方.
Consumer:消费者, 调用服务方
Container:Dubbo容器.依赖于Spring容器.
Registry: 注册中心.当Container启动时把所有可以提供的服务列表上Registry中进行注册.
--作用:告诉Consumer提供了什么服务和服务方在哪里.
Monitor:监听器
虚线都是异步访问,实线都是同步访问
蓝色虚线:在启动时完成的功能
红色虚线(实线)都是程序运行过程中执行的功能
所有的角色都是可以在单独的服务器上.所以必须遵守特定的协议.
运行原理
启动容器,相当于在启动Dubbo的Provider
启动后会去注册中心进行注册.注册所有可以提供的服务列表
在Consumer启动后会去Registry中获取服务列表和Provider的地址.进行订阅.
当Provider有修改后,注册中心会把消息推送给Consummer
--使用了观察者设计模式(又叫发布/订阅设计模式)
根据获取到的Provider地址,真实调用Provider中功能.
--在Consumer方使用了代理设计模式.创建一个Provider方类的一个代理对象.通过代理对象获取Provider中真实功能,起到保护Provider真实功能的作用.
Consumer和Provider每隔1分钟向Monitor发送统计信息,统计信息包含,访问次数,频率等。
本地服务mock
首先需要消费端先实现服务接口的mock实现类,需要注意的是,mock实现类必须要符合“接口名.类名Mock”格式,否则启动时会报错。在执行mock服务实现类mock()方法前,会先发起远程调用,当远程调用失败(比如服务不存在)时,才会降级执行mock功能。
服务降级
force:return策略,当服务调用方设置某个接口的降级策略为这种方式时,服务调用方在调用该接口服务时会直接在客户端内返回设置的mock值,而不会再通过远程调用方式调用服务提供者。
fail:return策略,表示服务调用方调用服务提供方的服务失败后在返回mock值,与force:return的区别时前者如果调用服务提供者成功,则返回正常的结果,如果调用失败则返回mock值。
隐式参数传递
本地服务暴露与引用
远程服务暴露与引用,时指提供方与消费方通过网络来进行通信,具体通信流程如图
、
其实Dubbo还提供了一种本地服务暴露与引用的方式,这在同一个JVM中进程中同时发布与调用同一个服务时显得比较重要,因为如果当前JVM内要调用的服务在本JVM进程内有提供,则避免了一次远程过程调用,而是直接在JVM内进行通信。如图:
第二章 Dubbo框架内核原理剖析
架构分层是一个比较经典的模式,比如网络中的7层协议,每层执行固定的功能,上层依赖下层提供的功能,下层的改变对上层不可见等,并且每层都是一个可被替换的组件。
Service和Config层位API接口层,是为了让Dubbo使用方方便地发布服务和引用服务;对于服务提供方来说需要实现服务接口,然后使用ServiceConfig API来发布服务;对应服务消费方来说需要使用ReferenceConfig对服务接口进行代理。Dubbo服务发布与引用可以直接初始化配置类,也可以通过Spring配置自动生成配置类。
其他各层均为SPI(service provider interface,服务提供者接口)层,SPI意味着下面各层都是组件化的,是可以被替换的,这也是Dubbo设计比较好的一点。Dubbo增强了JDK中提供的标准SPI功能,在Dubbo中除了Service和Config层,其他各层都是通过实现扩展点接口来提供服务的;Dubbo增强的SPI增加了对扩展点Ioc和AOP的支持,一个扩展点可以直接使用setter()方法注入其他扩展点,并且不会一次性实例化扩展点的所有实现类,这就避免了当扩展点实现类初始化很耗时,但当前还没用上它的功能是仍进行加载实例化这种浪费资源的情况。增强SPI实在具体用某一个实现类的时候才对具体实现类进行实例化。
Proxy服务代理层:该层主要是对服务消费端使用的接口进行代理,把本地调用透明的转换为远程调用;另外对服务提供方的服务实现类进行代理,把服务实现类转换为Wrapper类,这减少了反射的调用。
Registry服务注册中心层:服务提供者启动时会把服务注册到服务注册中心,消费者启动时回去服务注册中心获取服务提供者的地址列表,Registry层主要功能是封装服务地址的注册与发现逻辑。
Cluster路由层:封装多个服务提供者的路由规则,负载均衡、集群容错的实现,并桥接注册中心;
扩展接口Cluster对应的实现类有FailoverCluster失败重试,FailbackCluster失败自动恢复,FailfastCluster快速失败,FailSafeCluster失败安全,ForkingCluster并行调用等;负载均衡扩展接口LoadBalance对应的实现类为RandomLoadBalance随机、RoundRobinLoadBalance轮询、LeastActiveLoadBalance最小活跃数、ConsistentHashLoadBalance一致性Hash。
Monitor监控层:用来统计RPC调用次数和调用耗时时间,扩展接口为MonitorFactory。
Protocol远程调用层:封装RPC调用逻辑。
Exchange信息交换层:封装请求响应模式,同步转异步。
Transport网络传输层:Mina和Netty抽象为统一接口。
Serialize数据序列化层:通过可以复用的一些工具。
扩展点接口的类都含有@SPI注解,这里注解中“dubbo”说明Protocol扩展接口SPI的默认实现是DubboProtocol。
@SPI("dubbo")
public interface Protocol {
}
服务提供者暴露一个服务的概要过程
首先,ServiceConfig类引用对外提供服务的实现类ref,然后通过ProxyFactory接口的扩展实现类的getInvoker()方法使用ref生成一个AbstractProxyInvoker实例,到这一步就完成了具体服务到Invoker的转化。接下来就是Invoker转换到Exporter的过程。Dubbo协议的Invoker转换Exporter发生在DubboProtocol类的export()方法中,Dubbo处理服务暴露的关键就在Invoker转换到Exporter的过程中,在这个过程中会启动NettyServer监听服务连接,然后将服务注册到服务注册中心。
服务消费者消费一个服务的概要过程
首先ReferenceConfig类的init()方法调用Protocol扩展接口实现类的refer()方法生成Invoker实例,这是服务消费的关键。接下来把Invoker转换为客户端需要的接口。
Dubbo协议的Invoker转换为客户端需要的接口,发生在ProxyFactory接口的扩展实现类的getProxy()方法中,它主要是使用代理对服务接口的调用转换为对Invoker的调用。
Dubbo适配器原理
Dubbo框架在使用扩展点功能的时候是对接口进行依赖的,而一个扩展接口对应了一系列的扩展实现类。那么如何选择使用哪一个扩展接口的实现类呢?其实这是使用适配器模式来做的。
Dubbo会使用动态编译技术为接口Protocol生成一个适配器类Protocol&Adaptive的对象实例,在Dubbo框架中需要使用Protocol的实例时,实际上是使用P&A的对象实例来获取具体SPI实现类的。
在Dubbo中URL是一个核心概念,Dubbo框架把所需的参数都拼接到URL对象里。这里假设代码传递的协议类型为Dubbo,则说明使用增强SPI返回扩展接口Protocol的Dubbo实现,即返回了DubboProtocol的实例。
总结一下就是,适配器类P&A会根据传递的协议参数不同,加载不同的Protocol的SPI实现。
其实在Dubbo框架中,框架会给每个SPI扩展接口动态生成一个对应的适配器类,并根据参数来使用增强SPI以选择不同的SPI实现。
Dubbo的动态编译原理
Dubbo中提供了一个Compiler的SPI,实现有JavassistCompiler默认实现和JdkCompiler两种。
ExtensionLoader#createAdaptiveExtensionClass
private Class<?> createAdaptiveExtensionClass() {
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
总结一下就是,Dubbo框架会为每个扩展接口生成其对应的适配器类的源码,然后选择具体的动态编译类的扩展实现对源码进行编译以生成适配器类的Class对象,然后就可以调用Class对象的newInstance()方法生成扩展接口对应的适配器类的实例。
增强SPI原理
Dubbo的扩展点加载机制是基于JDK标准的SPI扩展机制增强而来的,Dubbo解决了JDK标准的SPI的以下问题:
Jdk标准的SPI会一次性实例化扩展点的所有实例,如果有些扩展实现初始化很耗时,但又没用上,那么加载就很浪费资源。
如果扩展点加载失败,是不会友好地向用户通知具体异常的。
增加了对扩展点IoC和AOP的支持,一个扩展点可以直接使用setter()方法注入其他扩展点,也可以对扩展点使用Wrapper类进行功能增强。
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
ExtensionLoader中持有一个全局的map存放type对应ExtensionLoader的key-value,getExtensionLoader(Protocol.class)会得到一个单例的ExtensionLoader< Protocol >;
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();
getAdaptiveExtension()会从全局的Holder中获取Protocol的适配器类,如果没有,则创建一个并放入Holder中。
private final Holder<Object> cachedAdaptiveInstance = new Holder<>();
所以当我们调用protocol.export(invoker)方法的时候实际调用的是动态生成的Protocol&Adaptive实例的export(invoker)方法,其内部代码首先获取参数里的URL对象,然后从URL对象里获取用户设置的协议的实现类的名称,然后根据名称获取具体的Protocol协议的实现类(后面我们会知道获取的是被使用Wrapper类增强后的实现类),最后调用Protocol协议的实现类的export(invoker)方法。
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
实现类map
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
找到实现类name对应的holder,如果没有则创建一个,如果holder中没有instance,则走createExtension方法创建
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
从配置文件中加载出所有的扩展类,并根据名称找到对应的class信息,实例化后放入map中
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>();
这个map中放的是干净的实例
然后如果有wrapper类则实现增强逻辑,最后返回增强后的instance,外面holder中保存的是增强后的实例。
Dubbo使用JavaAssist减少反射调用开销
Dubbo会给每个服务提供者的实现类生产一个Wrapper类,这个Wrapper类里面最终调用服务提供者的接口实现类,Wrapper类的存在是为了减少反射的调用。当服务提供方收到消费方发来的请求后,需要根据消费者传递过来的方法名和参数反射调用服务提供者的实现类,而反射本身是有性能开销的,Dubbo把每个服务提供者的实现类通过JavaAssist包装为一个Wrapper类以减少反射调用开销。
第三章远程服务发布与引用流程剖析
// 7.导出服务
serviceConfig.export();
服务导出分为不导出,本地导出,远程导出
本地导出使用了injvm协议,是一个伪协议,它不开启端口,不发起远程调用,只在JVM内直接关联,但执行Dubbo的Filter链。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
首先调用了JavassistProxyFactory的getInvoker方法:
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
这里首先把服务实现类转换为Wrapper类,是为了减少反射的调用,这里返回的是AbstractProxyInvoker对象,其内部重写doInvoke()方法。
当执行protocol.export(wrapperInvoker)方法的时候,实际调用了Protocol的适配器类Protocol&Adaptive的export()方法。如果为远程服务暴露,则其内部根据URL中Protocol的类型为registry,会选择Protocol的实现类ResistryProtocol,如果为本地服务暴露,则其内部根据URL中Protocol的类型为injvm,会选择Protocol的实现类InjvmProtocol。但由于Dubbo SPI的扩展点使用了Wrapper自动增强,这里就是用了ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper对其进行了增强。
RegistryProtocol中的export()方法通过doLocalExport启动了NettyServer进行监听服务。
doLocalExport中调用了DubboProtocol的export
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
});
}
DubboProtocol的export中将Invoker转换为DubboExporter对象,并且把它保存到了缓存exporterMap中;
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
其中openServer是开启nettyServer;
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
使用(ip:port)作为key,能够保证同一台机器多个服务启动时,只会在第一个创建一个netty服务,后面使用的都是缓存中的同一个。
createServer方法中
server = Exchangers.bind(url, requestHandler);
@Override
public ExchangeServer bind(URL url, ExchangeHandler
handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
会调用Transporters.bind,这两个调用都是使用的扩展实现类
public static Server bind(URL url, ChannelHandler...
handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url ==
null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers
== null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
服务启动后要注册到注册中心,我们使用zookeeper作为注册中心,在RegistryProtocol的export方法中getRegistry()方法获取注册中心,RegistryFactory为扩展接口,这里的实现类为ZookeeperRegistryFactory,然后调用createRegistry()方法创建了一个ZookeeperRegistry作为Zookeeper注册中心。
服务注册到Zookeeper后,消费端就可以在Providers节点下找到对应服务的所有服务提供者,然后根据设置的负载均衡策略选择机器进行远程调用了。
Dubbo服务提供方如何处理请求
消费端发起TCP链接并完成后,服务提供方的NettyServer的connected方法会被激活,该方法的执行是在Netty的I/O线程上执行的,为了可以及时释放I/O线程,Netty默认的线程模型为All,所有消息都派发到Dubbo内部的业务线程池,这些消息包括请求事件、响应事件、连接事件、断开事件、心跳事件等。这里对应的AllChannelHandler类把I/O线程接收到的消息包装为ChannelEventRunnable任务并都投递到线程池里。
NettyServer启动netty时,会把自己组装成ChannelHandler放到channelPiple中
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
pipeline.addLast("handler", nettyHandler);
当接收到connected请求时,会走nettyServer的connected方法,该方法是父类AbstractServer中的方法:
@Override
public void connected(Channel ch) throws RemotingException
{
// If the server has
entered the shutdown process, reject any new connection
if (this.isClosing() || this.isClosed()) {
logger.warn("Close new
channel " + ch + ", cause: server is closing or has
been closed. For example, receive a new connect request while in shutdown process.");
ch.close();
return;
}
Collection<Channel> channels =
getChannels();
if (accepts > 0 &&
channels.size() > accepts) {
logger.error("Close
channel " + ch + ", cause: The server " +
ch.getLocalAddress() + " connections greater than max config
" + accepts);
ch.close();
return;
}
super.connected(ch);
}
它会在调用父类AbstractPeer中的connected方法:
@Override
public void connected(Channel ch) throws RemotingException
{
if (closed) {
return;
}
handler.connected(ch);
}
这里的handler就是创建nettyServer时候,dubboProtocol中传进来的它的内部类:
createServer方法:
server = Exchangers.bind(url, requestHandler);
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter()
{
@Override
public CompletableFuture<Object>
reply(ExchangeChannel channel, Object message) throws RemotingException
{
…
}
所以,线程池的任务被执行后,最终会调用DubboProtocol的connected方法:
@Override
public void connected(Channel channel) throws RemotingException
{
invoke(channel, Constants.ON_CONNECT_KEY);
}
private void invoke(Channel channel, String
methodKey) {
Invocation invocation =
createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to
invoke event method " + invocation.getMethodName() + "(), cause:
" + t.getMessage(), t);
}
}
}
private Invocation createInvocation(Channel channel, URL url, String
methodKey) {
String method =
url.getParameter(methodKey);
if (method == null ||
method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
这里URL内不包含Constants.ON_CONNECT_KEY,所有返回null,invok方法执行完毕。
初始化netty服务时候传进去的handler是经过包装的:
@Override
public ExchangeServer bind(URL url, ExchangeHandler
handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
所有received方法会先经过HeaderExchangeHandler类的received
@Override
public void received(Channel channel, Object message) throws RemotingException
{
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel
exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request =
(Request) message;
if (request.isEvent())
{
handlerEvent(channel, request);
} else {
//如果需要返回值则走handleRequest
if (request.isTwoWay())
{
handleRequest(exchangeChannel, request);
} else {
//如果不需要返回值则直接走dubboProtocol的received
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response)
message);
} else if (message instanceof String) {
if (isClientSide(channel))
{
Exception e = new Exception("Dubbo
client can not supported string message: " + message + " in
channel: " + channel + ", url: " +
channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String)
message);
if (echo != null &&
echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
如果需要返回值则走handleRequest,如果不需要返回值则直接走dubboProtocol的received。两种情况最后都会交给dubboProtocol的reply方法来执行:
@Override
public CompletableFuture<Object> reply(ExchangeChannel
channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported
request: "
+ (message == null ? null :
(message.getClass().getName() + ": " + message))
+ ", channel:
consumer: " + channel.getRemoteAddress() + " -->
provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?>
invoker = getInvoker(channel, inv);
// need to consider
backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr =
invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null ||
!methodsStr.contains(",")) {
hasMethod =
inv.getMethodName().equals(methodsStr);
} else {
String[] methods =
methodsStr.split(",");
for (String method :
methods) {
if (inv.getMethodName().equals(method))
{
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The
methodName " + inv.getMethodName()
+ " not found
in callback service interface ,invoke will be ignored."
+ " please
update the api interface. url is:"
+ invoker.getUrl())
+ " ,invocation is :" + inv);
return null;
}
}
RpcContext rpcContext = RpcContext.getContext();
rpcContext.setRemoteAddress(channel.getRemoteAddress());
Result result =
invoker.invoke(inv);
if (result instanceof AsyncRpcResult)
{
return ((AsyncRpcResult)
result).getResultFuture().thenApply(r -> (Object) r);
} else {
return CompletableFuture.completedFuture(result);
}
}
Result
result = invoker.invoke(inv);获取调用方对应的Invoker
RpcContext
rpcContext = RpcContext.getContext();
rpcContext.setRemoteAddress(channel.getRemoteAddress());获取上下文,并设置对端地址
最后写回结果,如果为同步执行则转换结果为CompletableFuture;
导出的Invoker执行invoke方法时,这里有个调用链,经过调用链后最终调用了服务提供方启动时AbstractProxyInvoker代理类创建的invoke方法。
Dubbo服务消费方启动流程剖析
服务消费方需要使用ReferenceConfig
API来消费服务这里就是执行get方法来生成远程调用代理类。get方法首先会执行init()方法:
public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The
invoker of ReferenceConfig(" + url + ") has
already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
private void init() {
//检查mock设置
checkMock(interfaceClass);
//创建代理
ref =
createProxy(map);
}
checkMock方法用来检查设置的mock是否正确,然后通过调用createProxy()方法来创建代理类。
首先判断是否需要开启本地引用,如果需要则创建JVM协议的本地引用,然后加载服务注册中心信息,服务注册中心可以有多个。
createProxy方法中调用registryProtocol的refer方法,内部调用doRefer方法;
private <T> Invoker<T> doRefer(Cluster cluster, Registry
registry, Class<T> type, URL url) {
RegistryDirectory<T> directory =
new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of
REFER_KEY
Map<String, String>
parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
//1建立路由规则链
directory.buildRouterChain(subscribeUrl);
//2订阅服务提供者地址
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY
+ "," + ROUTERS_CATEGORY));
//3包装机器容错策略到Invoker
Invoker invoker =
cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
代码1根据订阅的URL创建路由规则链,代码2的作用是向服务注册中心订阅服务提供者的服务,代码3则是调用扩展接口Cluster的适配器类的join方法,根据参数选择配置的集群容错策略。
代码2的逻辑:通过Zookeeper作为服务注册中心看下Invoker是如何生成的:
步骤2、3、4用来从zookeeper获取服务提供者的地址列表,等zookeeper返回地址列表后会调用RegistryDirectory的notify()方法:
@Override
public synchronized void notify(List<URL>
urls) {
Map<String, List<URL>>
categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url
-> {
if (UrlUtils.isConfigurator(url))
{
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url))
{
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url))
{
return PROVIDERS_CATEGORY;
}
return "";
}));
//配置信息,比如服务降级信息
List<URL>
configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
//路由信息收集并保存
List<URL>
routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
//providers服务提供者信息
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}
步骤6、7、8根据获取的最新的服务提供者URL地址,将其转换为具体的invoker列表,也就是说每个提供者URL会被转换为一个Invoker对象,具体转换在toInvokers方法中进行:
if (enabled) {
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
将服务接口转换到invoker对象是通过调用protocol.refer(serviceType,url)来完成的,这里的protocol对象也是Protocol扩展接口的适配器对象,所有调用protocol.refer实际上是调用适配器Protocol&Adaptive的refer方法,在URL中,协议默认为dubbo,所以适配器里调用的应该是DubboProtocol的refer方法。
Dubbo默认提供了一系列Wrapper类对扩展实现类进行功能增强,Dubbo使用了ProtocolListenerWrapper,ProtocolFilterWrapper等类对DubboProtocol进行了功能增强。所以这里经过了一次次调用后才调用到DubboProtocol的refer方法。
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
getClients(url)方法创建服务消费端的nettyClient对象:
public NettyClient(final URL url, final ChannelHandler
handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
这里的handler就是dubboProtocol的内部类ExchangeHandlerAdapter;
这里需要注意三点:
第一点:由于同一个服务提供者机器可以提供多个服务,那么消费者机器需要与同一个服务提供者机器提供的多个服务共享连接,还是与每个服务都建立一个连接?共享连接
第二点:消费端是启动时就与服务提供者机器建立好链接吗?是的
第三点:每个服务消费端与服务提供者集群中的所有机器都有连接吗?是的
DubboProtocol的refer方法,返回了一个dubboInvoker,这就是原生的invoker对象,服务消费方远程服务转换就是为了这个involer。
至此可知,在RegistryDirectory里维护了所有服务者的invoker列表,消费端发起远程调用时就是根据集群容错和负载均衡算法以及路由规则从invoker列表里选择一个进行调用的,当服务提供者宕机的时候,zookeeper会通知更新这个invoker列表。
RegistryProtocol的doRefer方法中cluser.join(directory)把持有invoker列表的RegistryDirectory组装成ClusterInvoker,cluster的默认实现是FailoverCluster:
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
FailoverClusterInvoker就是集群容错策略。
MockClusterWrapper把FailBackClusterInvoker包装成了MockClusterInvoker。
Dubbo服务消费端一次远程调用过程
ReferenceConfig的createProxy方法中
// create service proxy
return (T) proxyFactory.getProxy(invoker);
这里调用的是JavassistProxyFactory的getProxy
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
服务消费端通过ReferenceConfig.get方法返回的是一个代理类,并且方法拦截器为InvokerInvocationHandler,所以当消费方调用了服务的接口方法后会被InvokerInvocationHandler拦截:
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[]
parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass()
== Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName)
&& parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName)
&& parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName)
&& parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(createInvocation(method, args)).recreate();
}
private RpcInvocation createInvocation(Method method, Object[] args) {
RpcInvocation invocation = new RpcInvocation(method, args);
if (RpcUtils.hasFutureReturnType(method))
{
invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
invocation.setAttachment(Constants.ASYNC_KEY, "true");
}
return invocation;
}
这里创建了RpcInvocation,其中method为调用的方法,args为参数,这个RpcInvocation对象会一直传递,知道发起远程调用。默认的集群容错策略FailoverClusterInvoker,其内部首先根据配置的负载均衡策略LoadBalance的扩展实现,选择一个invoker作为FailoverClusterInvoker具体的远程调用者,如果调用发生异常,则根据FailoverClusterInvoker的策略重新选择一个invoker进行调用。最后调用了原生的DubboInvoker,其使用NettyClient与服务提供者进行交互;
@Override
protected Result doInvoke(final Invocation
invocation) throws Throwable {
//1设置附加属性
RpcInvocation inv = (RpcInvocation)
invocation;
final String methodName =
RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
//2获取远程调用client
ExchangeClient
currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement()
% clients.length];
}
try {
//3执行远程调用
boolean isAsync =
RpcUtils.isAsync(getUrl(), invocation);
boolean isAsyncFuture =
RpcUtils.isReturnTypeFuture(inv);
boolean isOneway =
RpcUtils.isOneway(getUrl(), invocation);
int timeout =
getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent =
getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future =
currentClient.request(inv, timeout);
// For compatibility
FutureAdapter<Object>
futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (isAsyncFuture) {
// register
resultCallback, sometimes we need the async result being processed by the
filter chain.
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {
RpcContext.getContext().setFuture(null);
return (Result)
currentClient.request(inv, timeout).get();
}
} catch (TimeoutException
e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke
remote method timeout. method: " + invocation.getMethodName() + ",
provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException
e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to
invoke remote method: " + invocation.getMethodName() + ",
provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
程序首先获取远程调用client,然后判断调用是否为异步调用、是否请求响应。
如果请求不需要响应结果则直接使用远程Client发起请求调用,然后将RpcContext上下文的future设置为null,并且返回空的RpcResult。
如果请求为异步请求,则保存远程Client发起请求后返回的future对象,并且设置到RpcContext上下文中,这样,调用方就可以通过RpcContext上下文获取该future。
如果请求为同步请求,则首先设置RpcContext上下文中的future对象为null,然后使用远程Client发起请求,然后在返回的future对象上调用get方法,以同步等待远程调用结果的返回。
第四章 Directory目录与Router路由服务
Directory代表了多个invoker(对于消费端来说,每个invoker代表了一个服务提供者),其内部维护者一个list,并且这个list的内容是动态变化的,比如当服务提供者集群新增或者减少机器时,服务注册中心就会推送当前服务提供者的地址列表,然后Directory中的list就会根据服务提供者地址列表相应变化。
在Dubbo中,Directory的实现有RegistryDirectory和StaticDirectory两种,其中前者管理invoker列表是根据服务注册中心的推送变化而变化的,而后者是当消费端使用了多个注册中心时,其把所有服务注册中心的invoker列表汇集到一个invoker列表中。
RegistryDirectory是啊服务消费端启动时创建的。
从zookeeper返回的服务提供者的信息里获取对应的路由规则,并保存到RouterChain里,这个路由规则是通过管理控制台进行配置的。另外,RouterChain里也保存了可用服务提供者对应的invokers列表和路由规则信息,当服务消费方的集群容错策略要获取可用服务提供者对应的invoker列表时,会调用RouterChain的route方法,其内部根据路由规则信息和invokers列表来提供服务。
总结一下就是,在服务消费端应用中,每个需要消费的服务都被包装为ReferenceConfig,在应用启动时会调用每个对应的ReferenceConfig的get方法,然后会为每个服务创建一个自己的RegistryDirectory对象,每个RegistryDirectory管理该服务提供者的地址列表、路由规则、动态配置等信息,当服务提供者的信息发生变化时,RegistryDirectory会动态地得到变化通知,并自动更新。
第五章 Dubbo消费端服务mock与服务降级策略原理
服务降级原理
public static void mockResult(String type) {
// (1)获取服务注册中心工厂
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class)
.getAdaptiveExtension();
// (2)根据zk地址,获取具体的zk注册中心的客户端实例
Registry registry2 = registryFactory.getRegistry(URL.valueOf("zookeeper://127.0.0.1:2181"));
// directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
// PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// (3)注册降级方案到zk
registry2.register(URL.valueOf(
"override://0.0.0.0/com.books.dubbo.demo.api.GreetingService?category=configurators&dynamic=false&application=first-dubbo-consumer&"
+ "mock=" + type + ":return+fail&group=dubbo&version=1.0.0"));
//(4)取消配置
registry2.unregister(URL.valueOf(
"override://0.0.0.0/com.books.dubbo.demo.api.GreetingService?category=configurators&dynamic=false&application=first-dubbo-consumer&"
+ "mock=" + type + ":return+null&group=dubbo&version=1.0.0"));
}
以上代码可以将服务降级信息注册到Zookeeper;当服务启动时,回去订阅服务子树中华的信息,比如Providers服务提供者列表,Routes路由信息,Configurators服务降级策略等信息。
当服务消费者发起远程调用时,回看是否设置了force:return降级策略,如果设置了则不发起远程调用并直接返回mock值,否则发起远程调用。当远程调用结果ok时,直接返回远程调用返回的结果;如果远程调用失败了,则看当前是否配置了fail:return的降级策略,如果设置了,则直接返回mock值,否则返回远程服务失败的具体原因。
服务消费端实在MockClusterInvoker类的invoke方法里使用降级策略并在DubboInvoker的invoke方法里发起远程调用的,所有服务降级实在消费端还没有发起远程调用时完成的。
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
//查看URL里是否有mock字段
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
//如果没有,或者值为默认的false则说明没有设置降级策略
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//no mock没有mock,正常发起远程调用
result = this.invoker.invoke(invocation);
//设置了force:return降级策略
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}
本地服务Mock原理
服务引用在启动时会在ReferenceConfig的init方法内调用checkMock来检查设置的mock的正确性。
Mock服务与服务降级策略一样,也是在MockClusterInvoker中实现的。
第六章 Dubbo集群容错与负载均衡策略
Dubbo集群容错策略概述
当我们进行系统设计时,不仅要考虑正常情况下代码逻辑应该如何走,还要考虑异常情况下代码逻辑应该怎么走。当服务消费方调用服务提供方的服务出现错误时,Dubbo提供了多种容错方案,默认模式为FailoverClusterInvoker,也就是失败重试。
FailoverCluster:失败重试
当服务消费方调用服务提供者失败后,会自动切换到其他服务提供者服务器进行重试,这通常用于读操作或者具有幂等的写操作。
FailfastCluster:快速失败
当服务消费方调用服务提供方失败后,立即报错,也就是只调用一次。通常,这种模式用于非幂等性的写操作。
FailsafeCluster:安全失败
当服务消费者调用服务出现异常时,直接忽略异常。这种模式通常用于写入审计日志等操作。
FailbackCluster:失败自动恢复
当服务消费端调用服务出现异常后,在后台记录失败的请求,并按照一定的策略后期在进行重试。这种模式通常用于消息通知操作。
ForkingCluster:并行调用
当消费方调用一个接口方法后,DubboClient会并行调用多个服务提供者的服务,只要其中有一个成功立即返回。这种模式通常用于实时性要求较高的读操作,但需要浪费更多服务资源。
BroadcastCluster:广播调用
当消费者调用一个接口方法后,DubboClient会逐个调用所有服务提供者,任意一台服务器调用异常则这次调用就标志失败。这种模式通常用于通知所有提供者更新缓存或日志等本地资源信息。
所有集群容错策略都是继承自抽象类AbstractClusterInvoker。
调用集群容错是在服务降级策略后进行的,集群容错FailoverClusterInvoker内部首先会调用父类AbstractClusterInvoker的list方法来获取invoker列表,即从RegistryDirectory管理的RouterChain的route方法中获取保存的invoker列表。
Dubbo负载均衡策略概述
当服务提供方是集群时,为了避免大量请求一直集中在一个或几个服务提供方机器上,从而使这些机器负载很高,甚至导致服务不可用,需要做一定的负载均衡策略。Dubbo提供了多种负载策略,默认为random,也就是每次随机调用一台服务提供者的服务。
RandomLoadBalance:随机策略。按照概率设置权重,比较均匀,并且可以动态调节提供者的权重。
RoundRobinLoadBalance:轮询策略。
LeastActiveLoadBalance:最少活跃调用数。如果每个提供者的活跃数相同,则随机选择一个。在每个服务提供者里维护着一个活跃数计数器,用来记录当前同时处理请求的个数,也就是并发处理任务的个数。这个值越小,说明当前服务提供者处理的速度越快或者当前机器的负载比较低,所以路由器选择时就选在该活跃度最低的机器。如果一个服务提供者处理速度很慢,由于堆积,那么同时处理的请求就比较多,也就是说活跃调用数目比较大,这时,处理速度慢的提供者将收到更少的请求。
ConsistentHashLoadBalance:一致性Hash策略。
第七章 Dubbo线程模型与线程池策略
Dubbo线程模型概述
Dubbo默认的底层网络通信使用的是Netty,服务提供方NettyServer使用谅解线程池,其中EventLoopGroup(boss)主要用来接收客户端的链接请求,并把完成TCP三次握手的连接分发给EventLoopGroup(worker)来处理,我们把boss和worker线程组称为I/O线程。
如果服务提供方的逻辑处理能迅速完成,并且不会发起新的I/O请求,那么直接在I/O线程上处理会更快,因为这样减少了线程池调度与上下文切换的开销。
但如果处理逻辑较慢,或者需要发起新的I/O请求,比如需要查询数据库,则I/O线程必须派发请求到新的线程池进行处理,否则I/O线程会被阻塞,导致不能接收其他请求。
根据请求的消息类是被I/O线程处理还是被业务线程池处理,Dubbo提供了下面集中线程模型。
all(AllDispatcher):所有消息都派发到业务线程池,这些消息包括请求、响应、连接事件、断开事件、心跳事件等。
direct(DirectDispatcher):所有消息都不派发到业务线程池,全部在IO线程上直接执行。
message(MessageOnlyDispatcher):只有请求响应消息派发到业务线程池,其他消息如连接事件、断开事件、心跳事件等,直接在I/O线程上执行。
execution(ExecutionDispatcher):只把请求类消息派发到业务线程池处理,但是响应、连接事件、断开事件、心跳事件等消息直接在I/O线程上执行。
Connection(ConnectionOrderedDispatcher):在I/O线程上将连接事件、断开事件放入队列,有序地逐个执行,其他消息派发到业务线程池处理。
在dubboProtocol初识化nettyServer时候,就把dispatcher加进来了:
NettyTransporter#bind
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
NettyServer#构造方法中:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
ChannelHandlers就是构造dispatcher的,这是一个单例类:
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
wrapInternal会调用Dispatcher&Adaptive适配器类构造一个对应的dispatcher出来,默认是AllDispatcher:
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
这里返回的是AllChannelHandler,它里面的一些方法会把请求放入业务线程池内执行:
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
注意这里的handler就是dubboProtocol传进来的内部类。
Dubbo的线程池策略
FixedThreadPool:创建一个具有固定个数线程的线程池。
LimitedThreadPool:创建一个线程池,这个线程池中的线程个数随着需要量动态增加,但是数量不超过配置的阈值。另外,空闲线程,不会被回收,会一直存在。
EagerThreadPool:创建一个线程池,在这个线程池中,当所有核心线程都处于忙碌状态时,将创建新的线程来执行新任务,而不是把任务放入线程池阻塞队列。
CachedThreadPool:创建一个自适应线程池,当线程空闲1分钟时,线程会被回收;当有新请求到来时,会创建新线程。
第八章 Dubbo如何实现泛化调用
泛化接口调用方式主要是在服务消费端没有API接口类及模型类元(比如入参和出参的POJO类)的情况下使用,其参数及返回值中没有对应的POJO类,所有全部POJO均转换为Map表示。使用泛化调用时服务消费模块不在需要引入SDK二方包。
第九章 Dubbo并发控制
由于资源的限制,一般会在服务提供方和消费方限制接口调用的并发数。
在客户端并发控制中,如果当激活并发量达到指定值后,当前客户端请求线程会被挂起。如果在等待超时期间激活并发请求量少了,那么阻塞的线程会被激活,然后发送请求到服务提供方;如果等待超时了,则直接抛出异常,这时服务根本就没有发送到服务提供方服务器。
服务提供方设置并发数后,如果同时请求数超过了设置的executes的值,则会抛出异常,而不是像消费端设置actives时那样去等待。
第十章 Dubbo隐式参数传递
要实现隐式参数传递,首先需要在服务消费端的AbstractClusterInvoker类的invoker方法内,把附加属性键值对放入RpcInvocation的attachments变量中,然后经过网络传递到服务提供端;服务提供端则使用ContextFilter对请求进行拦截,并从RpcInvocation中获取attachments中的键值对,然后使用RpcContext.getContext().setAttachment设置到上下文中。RpcContext中使用的是InternalThreadLocal。
第十一章 Dubbo全链路异步
从2.7.0版本开始,Dubbo以CompletableFuture为基础支持所有异步编程接口。
异步调用时基于NIO的非阻塞能力实现并行调用,服务消费端不需要启动多线程即可完成并行调用多个远程服务,相对于多线程开销较小。
步骤1:当服务消费端发起RPC调用时使用的用户线程,用户线程首先使用步骤2创建一个Future对象,接着步骤3会把请求转换为I/O线程来执行,步骤3为异步过程,所有会马上返回,然后用户线程使用步骤4把其创建的Future对象设置到RpcContext中,其后用户线程就返回了。
在步骤5中,用户线程可以在某个时间点从RpcContext中获取设置的Future对象,并且使用步骤6来等待调用结果。
在步骤7中,当服务提供方返回结果后,调用方线程模型中的线程池中的线程会把结果使用步骤8写入Future,这时用户线程就可以得到远程调用结果了。
在DubboInvoker中的doInvoke方法中:
} else if (isAsync) {
ResponseFuture future =
currentClient.request(inv, timeout);
// For compatibility
FutureAdapter<Object>
futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (isAsyncFuture) {
// register
resultCallback, sometimes we need the async result being processed by the
filter chain.
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {
调用ReferenceCountExchangeClient的request方法:
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException
{
return client.request(request, timeout);
}
这里调用HeaderExchangeClient的request方法:
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException
{
return channel.request(request, timeout);
}
这里在调用HeaderExchangeChannel的request方法:
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException
{
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to
send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture
future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException
e) {
future.cancel();
throw e;
}
return future;
}
这里调用DefaultFuture.newFuture方法创建一个DefaultFuture;
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id =
request.getId();
this.timeout = timeout > 0 ? timeout :
channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting
map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
private static final Map<Long, DefaultFuture>
FUTURES = new ConcurrentHashMap<>();
它里面有两个类变量存储全局的channel和DefaultFuture实例关系;
简单总结一下:当服务消费端业务线程发起请求后,会创建一个DefaultFuture对象并设置到RpcContext中,然后在启动I/O线程发起请求后调用线程就返回了null的结果;当业务线程从RpcContext获取future对象并调用其get方法获取真实的响应结果后,当前线程会调用条件变量done的await方法而挂起;放服务提供端把结果写回调用方之后,调用方线程模型中线程池里的线程会把结果写入DefaultFuture对象内的结果变量中,接着调用条件变量的signal方法来激活业务线程,然后业务线程就会从get方法返回响应结果。
这里所讲的这种实现异步调用的方式基于从返回future调用get方法,其缺点是当业务线程调用get方法后业务线程会被阻塞,这不是我们想要的,所有Dubbo提供了在future对象上设置回调函数的方式,让我们实现真正的异步调用。
这种方式在业务线程获取了future对象后,在其上设置回调函数后马上就会返回,接着等服务提供端把响应结果写回调用方,然后调用方线程模型中线程池里的线程会把结果写入future对象,其后对回调函数进行回调。由此可知,这个过程中不需要业务线程干预的,实现了真正的异步调用。
// 3. 直接返回null
GreetingService greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello("world"));
// 4.异步执行回调
((FutureAdapter) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback()
{
@Override
public void done(Object
response) {
System.out.println("result:"
+ response);
}
@Override
public void caught(Throwable
exception) {
System.out.println("error:"
+ exception.getLocalizedMessage());
}
});
服务提供端异步执行
在Provider端非异步执行时,对调用方发来的请求的处理是在Dubbo内部线程模型的线程池中执行的。在Dubbo中,服务提供方提供的所有服务接口都是使用这以个线程池来执行的,所有当一个服务执行比较耗时时,可能会占用线程池中很多线程,从而导致其他服务的处理受到影响。
Provider端异步执行则将服务的处理逻辑从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池中的线程被过度占用,有助于避免不同服务间的相互影响。
但需要注意的是,Provider端异步执行对节省资源和提升RPC响应性能是没有效果的,这是因为如果服务处理比较耗时,虽然不是使用Dubbo框架内部线程处理,但还是需要业务自己的线程来处理,另外还有副作用,即新增一次线程上下文切换。
异步调用与执行引入的新问题
Filter链
上下问对象传递
第十二章本地服务暴露与引用原理
InjvmProtocol,服务提供方在export的时候如果是本地服务,则之直接把invoker放入InjvmProtocol的全局map中,
protected final Map<String, Exporter<?>>
exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
InjvmProtocol是个单例,服务消费方引用服务的时候,看本地是否有服务,如果有直接从InjvmProtocol中的map中取invoker使用。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}
InjvmInvoker的doInvoker方法:
@Override
public Result doInvoke(Invocation invocation) throws Throwable {
Exporter<?> exporter =
InjvmProtocol.getExporter(exporterMap, getUrl());
if (exporter == null) {
throw new RpcException("Service
[" + key + "] not found.");
}
RpcContext.getContext().setRemoteAddress(Constants.LOCALHOST_VALUE, 0);
return exporter.getInvoker().invoke(invocation);
}
本地服务引用,消费方和服务方的filter还是正常的存在。
第十三章 Dubbo协议与网络传输
Dubbo协议
在TCP协议栈中,每层协议都有自己的协议报文格式,比如TCP是网络七层模型中的传输层,是TCP协议报文格式;在TCP上层是应用层(应用层协议常见的有HTTP协议等),Dubbo协议作为建立在TCP之上的一种应用层协议,自然也有自己的协议报文格式。Dubbo协议也是参考了TCP协议栈中的协议,协议内容由header和body两部分组成。
header 格式 |
|||||
0-7 |
8-15 |
16-23 |
24-31 |
32-95 |
96-127 |
magic |
magic low |
request flag and serialization id |
response id |
request id |
body length |
Header总包含了16字节的数据。其中,前两字节为魔数,类似Class类文件里的魔数,这里用来标识一个帧的开始,固定为0xdabb,第一字节固定为0xda,第二字节固定为0xbb。
紧跟其后的一字节是请求类型和序列化标记ID的组合结果,request
flag|serializationId。其中,高四位表示请求类型,,低四位表示请求类型。
再后面的一字节是只在响应报文里才设置,用来标示响应的结果码。其后的8字节是请求ID。最后的4字节是body内容的大小,也就是指定在协议头header内容后的多少字节是协议body的内容。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:深度剖析Apache Dubbo核心技术内幕学习笔记 - Python技术站