目录

dubbo原理

RPC原理

http://img.cana.space/picStore/20201120134934.png

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
一次完整的RPC调用流程(同步调用,异步另说)如下: 
1)服务消费方(client)调用以本地调用方式调用服务; 
2)client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体; 
3)client stub找到服务地址,并将消息发送到服务端; 
4)server stub收到消息后进行解码; 
5)server stub根据解码结果调用本地的服务; 
6)本地服务执行并将结果返回给server stub; 
7)server stub将返回结果打包成消息并发送至消费方; 
8)client stub接收到消息,并进行解码; 
9)服务消费方得到最终结果。

RPC框架的目标就是要2~8这些步骤都封装起来,这些细节对用户来说是透明的,不可见的。

netty通信原理

Netty是一个异步事件驱动的网络应用程序框架, 用于快速开发可维护的高性能协议服务器和客户端。它极大地简化并简化了TCP和UDP套接字服务器等网络编程。

BIO:(Blocking IO)

http://img.cana.space/picStore/20201120135141.png

NIO (Non-Blocking IO)

http://img.cana.space/picStore/20201120135202.png

Selector 一般称 为选择器 ,也可以翻译为 多路复用器,

Connect(连接就绪)、Accept(接受就绪)、Read(读就绪)、Write(写就绪)

Netty基本原理:

http://img.cana.space/picStore/20201120135259.png

dubbo原理

框架设计

http://img.cana.space/picStore/20201120135544.png

图例说明

  • 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
  • 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
  • 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
  • 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

各层说明

  • config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
  • proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
  • registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
  • monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
  • protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
  • exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
  • transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
  • serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

关系说明

  • 在 RPC 中,Protocol 是核心层,也就是只要有 Protocol + Invoker + Exporter 就可以完成非透明的 RPC 调用,然后在 Invoker 的主过程上 Filter 拦截点。
  • 图中的 Consumer 和 Provider 是抽象概念,只是想让看图者更直观的了解哪些类分属于客户端与服务器端,不用 Client 和 Server 的原因是 Dubbo 在很多场景下都使用 Provider, Consumer, Registry, Monitor 划分逻辑拓普节点,保持统一概念。
  • 而 Cluster 是外围概念,所以 Cluster 的目的是将多个 Invoker 伪装成一个 Invoker,这样其它人只要关注 Protocol 层 Invoker 即可,加上 Cluster 或者去掉 Cluster 对其它层都不会造成影响,因为只有一个提供者时,是不需要 Cluster 的。
  • Proxy 层封装了所有接口的透明化代理,而在其它层都以 Invoker 为中心,只有到了暴露给用户使用时,才用 Proxy 将 Invoker 转成接口,或将接口实现转成 Invoker,也就是去掉 Proxy 层 RPC 是可以 Run 的,只是不那么透明,不那么看起来像调本地服务一样调远程服务。
  • 而 Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上,Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层,Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。
  • Registry 和 Monitor 实际上不算一层,而是一个独立的节点,只是为了全局概览,用层的方式画在一起。

启动解析、加载配置信息

xml配置走的是DubboNamespaceHandler进行初始化,而yml或properties走的是EnableDubbo接口进行初始化。接下来讲解xml配置的初始化方式。

DubboNamespaceHandler -> DubboBeanDefinitionParser

配置文件是如何被解析的?

配置文件是spring的一个配置文件,spring在启动的时候会去解析配置文件,有一个总接口org.springframework.beans.factory.xml.BeanDefinitionParser

从继承结构上可以看到org.apache.dubbo.config.spring.schema.DubboBeanDefinitionParser实现了这个接口。

org.apache.dubbo.config.spring.schema.DubboNamespaceHandler#init定义了各个标签对应的结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@Override
public void init() {
    registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
    registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
    registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
    registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
    registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
    registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
    registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
    registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
    registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
    registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
    registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
    registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
    registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}

org.apache.dubbo.config.spring.schema.DubboBeanDefinitionParser#parse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, java.lang.Class<?>, boolean)会解析各个标签属性到对应的xxxConfig

服务暴露

http://img.cana.space/picStore/20201120142803.png

afterPropertiesSet

解析配置,ServiceBean解析

org.apache.dubbo.config.spring.schema.DubboNamespaceHandler#init

1
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));

ServiceBean

1
2
3
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
        ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
        ApplicationEventPublisherAware {

InitializingBean

在容器创建完ServiceBean对象后会调用afterPropertiesSet方法

ServiceBean继承自ServiceConfig,afterPropertiesSet方法主要作用是将之前解析的所有配置保存在ServiceBean中。

onApplicationEvent

ApplicationListener<ContextRefreshedEvent>

当ioc容器刷新完成也就是所有对象创建完成时会来回调实现这个接口的方法onApplicationEvent

这个方法里面主要是实现暴露服务逻辑

org.apache.dubbo.config.ServiceConfig#doExportUrls

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
private void doExportUrls() {
    List<URL> registryURLs = loadRegistries(true);
  // 对应到我们的示例程序就是一个protocolConfig,协议是dubbo,端口号20881,如果要暴露多个协议(例如http)或者端口也可以配置成多个
    for (ProtocolConfig protocolConfig : protocols) {
        String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
        ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
        ApplicationModel.initProviderModel(pathKey, providerModel);
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
...
// invoker 服务提供者真正执行的信息
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 暴露invoker
// private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
// 基于java的spi机制获得这个接口的实现,由于使用dubbo协议,所以是DubboProtocol,又由于使用注册中心,所有还有RegistryProtocol
Exporter<?> exporter = protocol.export(wrapperInvoker); // 暴露invoker会生成exporter
// 将暴露信息exporter保存起来
exporters.add(exporter);

RegistryProtocol

org.apache.dubbo.registry.integration.RegistryProtocol#export

1
2
3
4
5
6
7
// 本地暴露,会去调用DubboProtocol.export
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 将注册信息注册到提供者消费者注册表中,包括提供者、注册地址、提供者的地址
// url to registry
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);

DubboProtocol

org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export

1
2
3
4
5
6
7
8
9
// 拿到服务提供者的url地址
URL url = invoker.getUrl();
...
// 各种转化后包装成一个exporter
  DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
...
// 打开服务器
// 使用netty框架打开服务器,bind端口,也就是启动netty服务器,监听20881端口
openServer(url);

org.apache.dubbo.registry.support.ProviderConsumerRegTable

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 维护 每个url地址对应的服务提供者和消费者执行器 关联关系
public static ConcurrentHashMap<String, ConcurrentMap<Invoker, ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<>();

public static <T> ProviderInvokerWrapper<T> registerProvider(Invoker<T> invoker, URL registryUrl, URL providerUrl) {
    ProviderInvokerWrapper<T> wrapperInvoker = new ProviderInvokerWrapper<>(invoker, registryUrl, providerUrl);
    String serviceUniqueName = providerUrl.getServiceKey();
    ConcurrentMap<Invoker, ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
    if (invokers == null) {
        providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashMap<>());
        invokers = providerInvokers.get(serviceUniqueName);
    }
    invokers.put(invoker, wrapperInvoker);
    return wrapperInvoker;
}
...

小结

DubboProtocol会在底层启用netty服务器监听20880端口

RegistryProtocol会保存注册信息到注册表中

服务引用

20201120153032

Service对应ServiceBean, Reference对应ReferenceBean

1
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {

ReferenceBean是一个FactoryBean,获取UserService就会调用FactoryBean的getObject方法

org.apache.dubbo.config.spring.ReferenceBean#getObject ↓

org.apache.dubbo.config.ReferenceConfig#init

1
ref = createProxy(map);

org.apache.dubbo.config.ReferenceConfig#createProxy

1
2
3
4
// 远程引用接口
// 同样使用spi加载protocol
// private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));

org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#protocolBindingRefer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // create rpc invoker.
  		// getClients(url) 根据url获取客户端(也就是netty客户端),最终会返回一个消费者的invoker
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#initClient

1
2
// 创建一个netty的客户端
client = Exchangers.connect(url, requestHandler);

RegistryProtocol

org.apache.dubbo.registry.integration.RegistryProtocol#doRefer

1
2
3
4
...
// 和服务提供者一样,将消费者url和invoker关联关系保存到注册表中
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
...

服务调用

展开总设计图的红色调用链,如下:

http://img.cana.space/picStore/20201120153734.png

  1. 将methods和args包装成RpcInvocation对象

  2. FailoverClusterInvoker.invoke(invocation)

  3. 根据负载均衡选择一个invoker

  4. DubboInvoker.invoke(invocation)

    1
    2
    3
    4
    5
    6
    7
    8
    
    ...
    AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                    CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                    asyncRpcResult.subscribeTo(responseFuture);
                    // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                    FutureContext.getContext().setCompatibleFuture(responseFuture);
                    return asyncRpcResult;
    ...
    

    由于netty是基于异步事件驱动的,所以将结果存放在java8才有的CompletableFuture中

  5. 回到一开始的invoker,org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke

    1
    2
    3
    4
    5
    6
    
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        ...
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
    

    org.apache.dubbo.rpc.AsyncRpcResult#recreate

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    public Object recreate() throws Throwable {
        RpcInvocation rpcInvocation = (RpcInvocation) invocation;
        FutureAdapter future = new FutureAdapter(this);
        RpcContext.getContext().setFuture(future);
        if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
            return future;
        }
       
        return getAppResponse().recreate();
    }
    

    Result

    1
    
    public interface Result extends CompletionStage<Result>, Future<Result>, Serializable {
    

    getAppResponse

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    public Result getAppResponse() {
        try {
            if (this.isDone()) {
                return this.get();
            }
        } catch (Exception e) {
            // This should never happen;
            logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.", e);
        }
        return new AppResponse();
    }
    

    可以看到最终还是调用了Future的get方法,将netty client的异步调用改成了客户端的同步阻塞式调用。

总结

各种Invoker层层包装,底层使用netty通信~