资讯专栏INFORMATION COLUMN

dubbo源码解析(二十五)远程调用——hessian协议

xzavier / 2917人阅读

摘要:客户端对象字节输出流请求对象响应对象增加协议头发送请求获得请求后的状态码三该类实现了接口,是创建的工厂类。该类的实现跟类类似,但是是标准的接口调用会采用的工厂类,而是的协议调用。

远程调用——hessian协议
目标:介绍远程调用中跟hessian协议相关的设计和实现,介绍dubbo-rpc-hessian的源码。
前言

本文讲解多是dubbo集成的第二种协议,hessian协议,Hessian 是 Caucho 开源的一个 RPC 框架,其通讯效率高于 WebService 和 Java 自带的序列化。dubbo集成hessian所提供的hessian协议相关介绍可以参考官方文档,我就不再赘述。

文档地址:http://dubbo.apache.org/zh-cn...
源码分析 (一)DubboHessianURLConnectionFactory

该类继承了HessianURLConnectionFactory类,是dubbo,用于创建与服务器的连接的内部工厂,重写了父类中open方法。

public class DubboHessianURLConnectionFactory extends HessianURLConnectionFactory {

    /**
     * 打开与HTTP服务器的新连接或循环连接
     * @param url
     * @return
     * @throws IOException
     */
    @Override
    public HessianConnection open(URL url) throws IOException {
        // 获得一个连接
        HessianConnection connection = super.open(url);
        // 获得上下文
        RpcContext context = RpcContext.getContext();
        for (String key : context.getAttachments().keySet()) {
            // 在http协议头里面加入dubbo中附加值,key为 header+key  value为附加值的value
            connection.addHeader(Constants.DEFAULT_EXCHANGER + key, context.getAttachment(key));
        }

        return connection;
    }
}

在hessian上加入dubbo自己所需要的附加值,放到协议头里面进行发送。

(二)HttpClientConnection

该类是基于HttpClient封装来实现HessianConnection接口,其中逻辑比较简单。

public class HttpClientConnection implements HessianConnection {

    /**
     * http客户端对象
     */
    private final HttpClient httpClient;

    /**
     * 字节输出流
     */
    private final ByteArrayOutputStream output;

    /**
     * http post请求对象
     */
    private final HttpPost request;

    /**
     * http 响应对象
     */
    private volatile HttpResponse response;

    public HttpClientConnection(HttpClient httpClient, URL url) {
        this.httpClient = httpClient;
        this.output = new ByteArrayOutputStream();
        this.request = new HttpPost(url.toString());
    }

    /**
     * 增加协议头
     * @param key
     * @param value
     */
    @Override
    public void addHeader(String key, String value) {
        request.addHeader(new BasicHeader(key, value));
    }

    @Override
    public OutputStream getOutputStream() throws IOException {
        return output;
    }

    /**
     * 发送请求
     * @throws IOException
     */
    @Override
    public void sendRequest() throws IOException {
        request.setEntity(new ByteArrayEntity(output.toByteArray()));
        this.response = httpClient.execute(request);
    }

    /**
     * 获得请求后的状态码
     * @return
     */
    @Override
    public int getStatusCode() {
        return response == null || response.getStatusLine() == null ? 0 : response.getStatusLine().getStatusCode();
    }

    @Override
    public String getStatusMessage() {
        return response == null || response.getStatusLine() == null ? null : response.getStatusLine().getReasonPhrase();
    }

    @Override
    public String getContentEncoding() {
        return (response == null || response.getEntity() == null || response.getEntity().getContentEncoding() == null) ? null : response.getEntity().getContentEncoding().getValue();
    }

    @Override
    public InputStream getInputStream() throws IOException {
        return response == null || response.getEntity() == null ? null : response.getEntity().getContent();
    }

    @Override
    public void close() throws IOException {
        HttpPost request = this.request;
        if (request != null) {
            request.abort();
        }
    }

    @Override
    public void destroy() throws IOException {
    }
(三)HttpClientConnectionFactory

该类实现了HessianConnectionFactory接口,是创建HttpClientConnection的工厂类。该类的实现跟DubboHessianURLConnectionFactory类类似,但是DubboHessianURLConnectionFactory是标准的Hessian接口调用会采用的工厂类,而HttpClientConnectionFactory是Dubbo 的 Hessian 协议调用。当然Dubbo 的 Hessian 协议也是基于http的。

public class HttpClientConnectionFactory implements HessianConnectionFactory {

    /**
     * httpClient对象
     */
    private final HttpClient httpClient = new DefaultHttpClient();

    @Override
    public void setHessianProxyFactory(HessianProxyFactory factory) {
        // 设置连接超时时间
        HttpConnectionParams.setConnectionTimeout(httpClient.getParams(), (int) factory.getConnectTimeout());
        // 设置读取数据时阻塞链路的超时时间
        HttpConnectionParams.setSoTimeout(httpClient.getParams(), (int) factory.getReadTimeout());
    }

    
    @Override
    public HessianConnection open(URL url) throws IOException {
        // 创建一个HttpClientConnection实例
        HttpClientConnection httpClientConnection = new HttpClientConnection(httpClient, url);
        // 获得上下文,用来获得附加值
        RpcContext context = RpcContext.getContext();
        // 遍历附加值,放入到协议头里面
        for (String key : context.getAttachments().keySet()) {
            httpClientConnection.addHeader(Constants.DEFAULT_EXCHANGER + key, context.getAttachment(key));
        }
        return httpClientConnection;
    }

}

实现了两个方法,第一个方法是给http连接设置两个参数配置,第二个方法是创建一个连接。

(四)HessianProtocol

该类继承了AbstractProxyProtocol类,是hessian协议的实现类。其中实现类基于hessian协议的服务引用、服务暴露等方法。

1.属性
/**
 * http服务器集合
 * key为ip:port
 */
private final Map serverMap = new ConcurrentHashMap();

/**
 * HessianSkeleto 集合
 * key为服务名
 */
private final Map skeletonMap = new ConcurrentHashMap();

/**
 * HttpBinder对象,默认是jetty实现
 */
private HttpBinder httpBinder;
2.doExport
@Override
protected  Runnable doExport(T impl, Class type, URL url) throws RpcException {
    // 获得ip地址
    String addr = getAddr(url);
    // 获得http服务器对象
    HttpServer server = serverMap.get(addr);
    // 如果为空,则重新创建一个server,然后放入集合
    if (server == null) {
        server = httpBinder.bind(url, new HessianHandler());
        serverMap.put(addr, server);
    }
    // 获得服务path
    final String path = url.getAbsolutePath();
    // 创建Hessian服务端对象
    final HessianSkeleton skeleton = new HessianSkeleton(impl, type);
    // 加入集合
    skeletonMap.put(path, skeleton);

    // 获得通用的path
    final String genericPath = path + "/" + Constants.GENERIC_KEY;
    // 加入集合
    skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class));

    // 返回一个线程
    return new Runnable() {
        @Override
        public void run() {
            skeletonMap.remove(path);
            skeletonMap.remove(genericPath);
        }
    };
}

该方法是服务暴露的主要逻辑实现。

3.doRefer
@Override
@SuppressWarnings("unchecked")
protected  T doRefer(Class serviceType, URL url) throws RpcException {
    // 获得泛化的参数
    String generic = url.getParameter(Constants.GENERIC_KEY);
    // 是否是泛化调用
    boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
    // 如果是泛化调用。则设置泛化的path和附加值
    if (isGeneric) {
        RpcContext.getContext().setAttachment(Constants.GENERIC_KEY, generic);
        url = url.setPath(url.getPath() + "/" + Constants.GENERIC_KEY);
    }

    // 创建代理工厂
    HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
    // 是否是Hessian2的请求 默认为否
    boolean isHessian2Request = url.getParameter(Constants.HESSIAN2_REQUEST_KEY, Constants.DEFAULT_HESSIAN2_REQUEST);
    // 设置是否应使用Hessian协议的版本2来解析请求
    hessianProxyFactory.setHessian2Request(isHessian2Request);
    // 是否应为远程调用启用重载方法,默认为否
    boolean isOverloadEnabled = url.getParameter(Constants.HESSIAN_OVERLOAD_METHOD_KEY, Constants.DEFAULT_HESSIAN_OVERLOAD_METHOD);
    // 设置是否应为远程调用启用重载方法。
    hessianProxyFactory.setOverloadEnabled(isOverloadEnabled);
    // 获得client实现方式,默认为jdk
    String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT);
    if ("httpclient".equals(client)) {
        // 用http来创建
        hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory());
    } else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
        // 抛出不支持的协议异常
        throw new IllegalStateException("Unsupported http protocol client="" + client + ""!");
    } else {
        // 创建一个HessianConnectionFactory对象
        HessianConnectionFactory factory = new DubboHessianURLConnectionFactory();
        // 设置代理工厂
        factory.setHessianProxyFactory(hessianProxyFactory);
        // 设置工厂
        hessianProxyFactory.setConnectionFactory(factory);
    }
    // 获得超时时间
    int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    // 设置超时时间
    hessianProxyFactory.setConnectTimeout(timeout);
    hessianProxyFactory.setReadTimeout(timeout);
    // 创建代理
    return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
}

该方法是服务引用的主要逻辑实现,根据客户端配置,来选择标准 Hessian 接口调用还是Dubbo 的 Hessian 协议调用。

4.getErrorCode
@Override
protected int getErrorCode(Throwable e) {
    // 如果属于HessianConnectionException异常
    if (e instanceof HessianConnectionException) {
        if (e.getCause() != null) {
            Class cls = e.getCause().getClass();
            // 如果属于超时异常,则返回超时异常
            if (SocketTimeoutException.class.equals(cls)) {
                return RpcException.TIMEOUT_EXCEPTION;
            }
        }
        // 否则返回网络异常
        return RpcException.NETWORK_EXCEPTION;
    } else if (e instanceof HessianMethodSerializationException) {
        // 序列化异常
        return RpcException.SERIALIZATION_EXCEPTION;
    }
    return super.getErrorCode(e);
}

该方法是针对异常的处理。

5.HessianHandler
private class HessianHandler implements HttpHandler {

    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response)
            throws IOException, ServletException {
        // 获得请求的uri
        String uri = request.getRequestURI();
        // 获得对应的HessianSkeleton对象
        HessianSkeleton skeleton = skeletonMap.get(uri);
        // 如果如果不是post方法
        if (!request.getMethod().equalsIgnoreCase("POST")) {
            // 返回状态设置为500
            response.setStatus(500);
        } else {
            // 设置远程地址
            RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());

            // 获得请求头内容
            Enumeration enumeration = request.getHeaderNames();
            // 遍历请求头内容
            while (enumeration.hasMoreElements()) {
                String key = enumeration.nextElement();
                // 如果key开头是deader,则把附加值取出来放入上下文
                if (key.startsWith(Constants.DEFAULT_EXCHANGER)) {
                    RpcContext.getContext().setAttachment(key.substring(Constants.DEFAULT_EXCHANGER.length()),
                            request.getHeader(key));
                }
            }

            try {
                // 执行下一个
                skeleton.invoke(request.getInputStream(), response.getOutputStream());
            } catch (Throwable e) {
                throw new ServletException(e);
            }
        }
    }

}

该内部类是Hessian的处理器,用来处理请求中的协议头内容。

后记
该部分相关的源码解析地址:https://github.com/CrazyHZM/i...

该文章讲解了远程调用中关于hessian协议的部分,内容比较简单,可以参考着官方文档了解一下。接下来我将开始对rpc模块关于hessian协议部分进行讲解。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/73175.html

相关文章

  • dubbo源码解析(四十八)异步化改造

    摘要:大揭秘异步化改造目标从源码的角度分析的新特性中对于异步化的改造原理。看源码解析四十六消费端发送请求过程讲到的十四的,在以前的逻辑会直接在方法中根据配置区分同步异步单向调用。改为关于可以参考源码解析十远程通信层的六。 2.7大揭秘——异步化改造 目标:从源码的角度分析2.7的新特性中对于异步化的改造原理。 前言 dubbo中提供了很多类型的协议,关于协议的系列可以查看下面的文章: du...

    lijinke666 评论0 收藏0
  • dubbo源码解析二十六)远程调用——http协议

    摘要:前言基于表单的远程调用协议,采用的实现,关于协议就不用多说了吧。后记该部分相关的源码解析地址该文章讲解了远程调用中关于协议的部分,内容比较简单,可以参考着官方文档了解一下。 远程调用——http协议 目标:介绍远程调用中跟http协议相关的设计和实现,介绍dubbo-rpc-http的源码。 前言 基于HTTP表单的远程调用协议,采用 Spring 的HttpInvoker实现,关于h...

    xiyang 评论0 收藏0
  • dubbo源码解析(四十六)消费端发送请求过程

    摘要:可以参考源码解析二十四远程调用协议的八。十六的该类也是用了适配器模式,该类主要的作用就是增加了心跳功能,可以参考源码解析十远程通信层的四。二十的可以参考源码解析十七远程通信的一。 2.7大揭秘——消费端发送请求过程 目标:从源码的角度分析一个服务方法调用经历怎么样的磨难以后到达服务端。 前言 前一篇文章讲到的是引用服务的过程,引用服务无非就是创建出一个代理。供消费者调用服务的相关方法。...

    fish 评论0 收藏0
  • dubbo源码解析(四十五)服务引用过程

    摘要:服务引用过程目标从源码的角度分析服务引用过程。并保留服务提供者的部分配置,比如版本,,时间戳等最后将合并后的配置设置为查询字符串中。的可以参考源码解析二十三远程调用的一的源码分析。 dubbo服务引用过程 目标:从源码的角度分析服务引用过程。 前言 前面服务暴露过程的文章讲解到,服务引用有两种方式,一种就是直连,也就是直接指定服务的地址来进行引用,这种方式更多的时候被用来做服务测试,不...

    xiaowugui666 评论0 收藏0
  • dubbo源码解析(四十七)服务端处理请求过程

    摘要:而存在的意义就是保证请求或响应对象可在线程池中被解码,解码完成后,就会分发到的。 2.7大揭秘——服务端处理请求过程 目标:从源码的角度分析服务端接收到请求后的一系列操作,最终把客户端需要的值返回。 前言 上一篇讲到了消费端发送请求的过程,该篇就要将服务端处理请求的过程。也就是当服务端收到请求数据包后的一系列处理以及如何返回最终结果。我们也知道消费端在发送请求的时候已经做了编码,所以我...

    yzzz 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<