资讯专栏INFORMATION COLUMN

dubbo源码解析(二十二)远程调用——Protocol

孙淑建 / 3396人阅读

摘要:七该类也实现了,也是装饰了接口,但是它是在服务引用和暴露过程中加上了监听器的功能。如果是注册中心,则暴露该创建一个暴露者监听器包装类对象该方法是在服务暴露上做了监听器功能的增强,也就是加上了监听器。

远程调用——Protocol
目标:介绍远程调用中协议的设计和实现,介绍dubbo-rpc-api中的各种protocol包的源码,是重点内容。
前言

在远程调用中协议是非常重要的一层,看下面这张图:

该层是在信息交换层之上,分为了并且夹杂在服务暴露和服务引用中间,为了有一个约定的方式进行调用。

dubbo支持不同协议的扩展,比如http、thrift等等,具体的可以参照官方文档。本文讲解的源码大部分是对于公共方法的实现,而具体的服务暴露和服务引用会在各个协议实现中讲到。

下面是该包下面的类图:

源码分析 (一)AbstractProtocol

该类是协议的抽象类,实现了Protocol接口,其中实现了一些公共的方法,抽象方法在它的子类AbstractProxyProtocol中定义。

1.属性
/**
 * 服务暴露者集合
 */
protected final Map> exporterMap = new ConcurrentHashMap>();

/**
 * 服务引用者集合
 */
//TODO SOFEREFENCE
protected final Set> invokers = new ConcurrentHashSet>();
2.serviceKey
protected static String serviceKey(URL url) {
    // 获得绑定的端口号
    int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
    return serviceKey(port, url.getPath(), url.getParameter(Constants.VERSION_KEY),
            url.getParameter(Constants.GROUP_KEY));
}

protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
    return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}

该方法是为了得到服务key group+"/"+serviceName+":"+serviceVersion+":"+port

3.destroy
@Override
public void destroy() {
    // 遍历服务引用实体
    for (Invoker invoker : invokers) {
        if (invoker != null) {
            // 从集合中移除
            invokers.remove(invoker);
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Destroy reference: " + invoker.getUrl());
                }
                // 销毁
                invoker.destroy();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    // 遍历服务暴露者
    for (String key : new ArrayList(exporterMap.keySet())) {
        // 从集合中移除
        Exporter exporter = exporterMap.remove(key);
        if (exporter != null) {
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Unexport service: " + exporter.getInvoker().getUrl());
                }
                // 取消暴露
                exporter.unexport();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
}

该方法是对invoker和exporter的销毁。

(二)AbstractProxyProtocol

该类继承了AbstractProtocol类,其中利用了代理工厂对AbstractProtocol中的两个集合进行了填充,并且对异常做了处理。

1.属性
/**
 * rpc的异常类集合
 */
private final List> rpcExceptions = new CopyOnWriteArrayList>();

/**
 * 代理工厂
 */
private ProxyFactory proxyFactory;
2.export
@Override
@SuppressWarnings("unchecked")
public  Exporter export(final Invoker invoker) throws RpcException {
    // 获得uri
    final String uri = serviceKey(invoker.getUrl());
    // 获得服务暴露者
    Exporter exporter = (Exporter) exporterMap.get(uri);
    if (exporter != null) {
        return exporter;
    }
    // 新建一个线程
    final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
    exporter = new AbstractExporter(invoker) {
        /**
         * 取消暴露
         */
        @Override
        public void unexport() {
            super.unexport();
            // 移除该key对应的服务暴露者
            exporterMap.remove(uri);
            if (runnable != null) {
                try {
                    // 启动线程
                    runnable.run();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
    };
    // 加入集合
    exporterMap.put(uri, exporter);
    return exporter;
}

其中分为两个步骤,创建一个exporter,放入到集合汇中。在创建exporter时对unexport方法进行了重写。

3.refer
@Override
public  Invoker refer(final Class type, final URL url) throws RpcException {
    // 通过代理获得实体域
    final Invoker target = proxyFactory.getInvoker(doRefer(type, url), type, url);
    Invoker invoker = new AbstractInvoker(type, url) {
        @Override
        protected Result doInvoke(Invocation invocation) throws Throwable {
            try {
                // 获得调用结果
                Result result = target.invoke(invocation);
                Throwable e = result.getException();
                // 如果抛出异常,则抛出相应异常
                if (e != null) {
                    for (Class rpcException : rpcExceptions) {
                        if (rpcException.isAssignableFrom(e.getClass())) {
                            throw getRpcException(type, url, invocation, e);
                        }
                    }
                }
                return result;
            } catch (RpcException e) {
                // 抛出未知异常
                if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                    e.setCode(getErrorCode(e.getCause()));
                }
                throw e;
            } catch (Throwable e) {
                throw getRpcException(type, url, invocation, e);
            }
        }
    };
    // 加入集合
    invokers.add(invoker);
    return invoker;
}

该方法是服务引用,先从代理工厂中获得Invoker对象target,然后创建了真实的invoker在重写方法中调用代理的方法,最后加入到集合。

protected abstract  Runnable doExport(T impl, Class type, URL url) throws RpcException;

protected abstract  T doRefer(Class type, URL url) throws RpcException;

可以看到其中抽象了服务引用和暴露的方法,让各类协议各自实现。

(三)AbstractInvoker

该类是invoker的抽象方法,因为协议被夹在服务引用和服务暴露中间,无论什么协议都有一些通用的Invoker和exporter的方法实现,而该类就是实现了Invoker的公共方法,而把doInvoke抽象出来,让子类只关注这个方法。

1.属性
/**
 * 服务类型
 */
private final Class type;

/**
 * url对象
 */
private final URL url;

/**
 * 附加值
 */
private final Map attachment;

/**
 * 是否可用
 */
private volatile boolean available = true;

/**
 *  是否销毁
 */
private AtomicBoolean destroyed = new AtomicBoolean(false);
2.convertAttachment
private static Map convertAttachment(URL url, String[] keys) {
    if (keys == null || keys.length == 0) {
        return null;
    }
    Map attachment = new HashMap();
    // 遍历key,把值放入附加值集合中
    for (String key : keys) {
        String value = url.getParameter(key);
        if (value != null && value.length() > 0) {
            attachment.put(key, value);
        }
    }
    return attachment;
}

该方法是转化为附加值,把url中的值转化为服务调用invoker的附加值。

3.invoke
@Override
public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let"s allow the current invoke to proceed
    // 如果服务引用销毁,则打印告警日志,但是通过
    if (destroyed.get()) {
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }

    RpcInvocation invocation = (RpcInvocation) inv;
    // 会话域中加入该调用链
    invocation.setInvoker(this);
    // 把附加值放入会话域
    if (attachment != null && attachment.size() > 0) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    // 把上下文的附加值放入会话域
    Map contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        /**
         * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
         * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
         * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
         * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
         */
        invocation.addAttachments(contextAttachments);
    }
    // 如果开启的是异步调用,则把该设置也放入附加值
    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
        invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
    }
    // 加入编号
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);


    try {
        // 执行调用链
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
        Throwable te = e.getTargetException();
        if (te == null) {
            return new RpcResult(e);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            return new RpcResult(te);
        }
    } catch (RpcException e) {
        if (e.isBiz()) {
            return new RpcResult(e);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        return new RpcResult(e);
    }
}

该方法做了一些公共的操作,比如服务引用销毁的检测,加入附加值,加入调用链实体域到会话域中等。然后执行了doInvoke抽象方法。各协议自己去实现。

(四)AbstractExporter

该类和AbstractInvoker类似,也是在服务暴露中实现了一些公共方法。

1.属性
/**
 * 实体域
 */
private final Invoker invoker;

/**
 * 是否取消暴露服务
 */
private volatile boolean unexported = false;
2.unexport
@Override
public void unexport() {
    // 如果已经消取消暴露,则之间返回
    if (unexported) {
        return;
    }
    // 设置为true
    unexported = true;
    // 销毁该实体域
    getInvoker().destroy();
}
(五)InvokerWrapper

该类是Invoker的包装类,其中用到类装饰模式,不过并没有实现实际的功能增强。

public class InvokerWrapper implements Invoker {

    /**
     * invoker对象
     */
    private final Invoker invoker;

    private final URL url;

    public InvokerWrapper(Invoker invoker, URL url) {
        this.invoker = invoker;
        this.url = url;
    }

    @Override
    public Class getInterface() {
        return invoker.getInterface();
    }

    @Override
    public URL getUrl() {
        return url;
    }

    @Override
    public boolean isAvailable() {
        return invoker.isAvailable();
    }

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }

    @Override
    public void destroy() {
        invoker.destroy();
    }

}
(六)ProtocolFilterWrapper

该类实现了Protocol接口,其中也用到了装饰模式,是对Protocol的装饰,是在服务引用和暴露的方法上加上了过滤器功能。

1.buildInvokerChain
private static  Invoker buildInvokerChain(final Invoker invoker, String key, String group) {
    Invoker last = invoker;
    // 获得过滤器的所有扩展实现类实例集合
    List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        // 从最后一个过滤器开始循环,创建一个带有过滤器链的invoker对象
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            // 记录last的invoker
            final Invoker next = last;
            // 新建last
            last = new Invoker() {

                @Override
                public Class getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                /**
                 * 关键在这里,调用下一个filter代表的invoker,把每一个过滤器串起来
                 * @param invocation
                 * @return
                 * @throws RpcException
                 */
                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

该方法就是创建带 Filter 链的 Invoker 对象。倒序的把每一个过滤器串连起来,形成一个invoker。

2.export
@Override
public  Exporter export(Invoker invoker) throws RpcException {
    // 如果是注册中心,则直接暴露服务
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    // 服务提供侧暴露服务
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

该方法是在服务暴露上做了过滤器链的增强,也就是加上了过滤器。

3.refer
@Override
public  Invoker refer(Class type, URL url) throws RpcException {
    // 如果是注册中心,则直接引用
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    // 消费者侧引用服务
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

该方法是在服务引用上做了过滤器链的增强,也就是加上了过滤器。

(七)ProtocolListenerWrapper

该类也实现了Protocol,也是装饰了Protocol接口,但是它是在服务引用和暴露过程中加上了监听器的功能。

1.export
@Override
public  Exporter export(Invoker invoker) throws RpcException {
    // 如果是注册中心,则暴露该invoker
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    // 创建一个暴露者监听器包装类对象
    return new ListenerExporterWrapper(protocol.export(invoker),
            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}

该方法是在服务暴露上做了监听器功能的增强,也就是加上了监听器。

2.refer
@Override
public  Invoker refer(Class type, URL url) throws RpcException {
    // 如果是注册中心。则直接引用服务
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    // 创建引用服务监听器包装类对象
    return new ListenerInvokerWrapper(protocol.refer(type, url),
            Collections.unmodifiableList(
                    ExtensionLoader.getExtensionLoader(InvokerListener.class)
                            .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}

该方法是在服务引用上做了监听器功能的增强,也就是加上了监听器。

后记
该部分相关的源码解析地址:https://github.com/CrazyHZM/i...

该文章讲解了远程调用中关于协议的部分,其实就是讲了一些公共的方法,并且把关键方法抽象出来让子类实现,具体的方法实现都在各个协议中自己实现。接下来我将开始对rpc模块的代理进行讲解。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/72957.html

相关文章

  • dubbo源码解析(四十六)消费端发送请求过程

    摘要:可以参考源码解析二十四远程调用协议的八。十六的该类也是用了适配器模式,该类主要的作用就是增加了心跳功能,可以参考源码解析十远程通信层的四。二十的可以参考源码解析十七远程通信的一。 2.7大揭秘——消费端发送请求过程 目标:从源码的角度分析一个服务方法调用经历怎么样的磨难以后到达服务端。 前言 前一篇文章讲到的是引用服务的过程,引用服务无非就是创建出一个代理。供消费者调用服务的相关方法。...

    fish 评论0 收藏0
  • dubbo源码解析(四十七)服务端处理请求过程

    摘要:而存在的意义就是保证请求或响应对象可在线程池中被解码,解码完成后,就会分发到的。 2.7大揭秘——服务端处理请求过程 目标:从源码的角度分析服务端接收到请求后的一系列操作,最终把客户端需要的值返回。 前言 上一篇讲到了消费端发送请求的过程,该篇就要将服务端处理请求的过程。也就是当服务端收到请求数据包后的一系列处理以及如何返回最终结果。我们也知道消费端在发送请求的时候已经做了编码,所以我...

    yzzz 评论0 收藏0
  • dubbo源码解析(四十八)异步化改造

    摘要:大揭秘异步化改造目标从源码的角度分析的新特性中对于异步化的改造原理。看源码解析四十六消费端发送请求过程讲到的十四的,在以前的逻辑会直接在方法中根据配置区分同步异步单向调用。改为关于可以参考源码解析十远程通信层的六。 2.7大揭秘——异步化改造 目标:从源码的角度分析2.7的新特性中对于异步化的改造原理。 前言 dubbo中提供了很多类型的协议,关于协议的系列可以查看下面的文章: du...

    lijinke666 评论0 收藏0
  • dubbo源码解析二十七)远程调用——injvm本地调用

    摘要:远程调用本地调用目标介绍本地调用的设计和实现,介绍的源码。前言是一个远程调用的框架,但是它没有理由不支持本地调用,本文就要讲解关于本地调用的实现。服务暴露者集合取消暴露调用父类的取消暴露方法从集合中移除二该类继承了类,是本地调用的实现。 远程调用——injvm本地调用 目标:介绍injvm本地调用的设计和实现,介绍dubbo-rpc-injvm的源码。 前言 dubbo是一个远程调用的...

    sean 评论0 收藏0
  • dubbo源码解析二十六)远程调用——http协议

    摘要:前言基于表单的远程调用协议,采用的实现,关于协议就不用多说了吧。后记该部分相关的源码解析地址该文章讲解了远程调用中关于协议的部分,内容比较简单,可以参考着官方文档了解一下。 远程调用——http协议 目标:介绍远程调用中跟http协议相关的设计和实现,介绍dubbo-rpc-http的源码。 前言 基于HTTP表单的远程调用协议,采用 Spring 的HttpInvoker实现,关于h...

    xiyang 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<