资讯专栏INFORMATION COLUMN

dubbo源码解析(三十九)集群——merger

lscho / 1577人阅读

摘要:源码分析一创建该类实现了接口,是分组集合的集群实现。三工厂类,获得指定类型的对象。后记该部分相关的源码解析地址该文章讲解了集群中关于分组聚合实现的部分。接下来我将开始对集群模块关于路由部分进行讲解。

集群——merger
目标:介绍dubbo中集群的分组聚合,介绍dubbo-cluster下merger包的源码。
前言

按组合并返回结果 ,比如菜单服务,接口一样,但有多种实现,用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。这个时候就要用到分组聚合。

源码分析 (一)MergeableCluster
public class MergeableCluster implements Cluster {

    public static final String NAME = "mergeable";

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        // 创建MergeableClusterInvoker
        return new MergeableClusterInvoker(directory);
    }

}

该类实现了Cluster接口,是分组集合的集群实现。

(二)MergeableClusterInvoker

该类是分组聚合的实现类,其中最关机的就是invoke方法。

@Override
@SuppressWarnings("rawtypes")
public Result invoke(final Invocation invocation) throws RpcException {
    // 获得invoker集合
    List> invokers = directory.list(invocation);

    /**
     * 获得是否merger
     */
    String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
    // 如果没有设置需要聚合,则只调用一个invoker的
    if (ConfigUtils.isEmpty(merger)) { // If a method doesn"t have a merger, only invoke one Group
        // 只要有一个可用就返回
        for (final Invoker invoker : invokers) {
            if (invoker.isAvailable()) {
                return invoker.invoke(invocation);
            }
        }
        return invokers.iterator().next().invoke(invocation);
    }

    // 返回类型
    Class returnType;
    try {
        // 获得返回类型
        returnType = getInterface().getMethod(
                invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
    } catch (NoSuchMethodException e) {
        returnType = null;
    }

    // 结果集合
    Map> results = new HashMap>();
    // 循环invokers
    for (final Invoker invoker : invokers) {
        // 获得每次调用的future
        Future future = executor.submit(new Callable() {
            @Override
            public Result call() throws Exception {
                // 回调,把返回结果放入future
                return invoker.invoke(new RpcInvocation(invocation, invoker));
            }
        });
        // 加入集合
        results.put(invoker.getUrl().getServiceKey(), future);
    }

    Object result = null;

    List resultList = new ArrayList(results.size());

    // 获得超时时间
    int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    // 遍历每一个结果
    for (Map.Entry> entry : results.entrySet()) {
        Future future = entry.getValue();
        try {
            // 获得调用返回的结果
            Result r = future.get(timeout, TimeUnit.MILLISECONDS);
            if (r.hasException()) {
                log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + 
                                " failed: " + r.getException().getMessage(), 
                        r.getException());
            } else {
                // 加入集合
                resultList.add(r);
            }
        } catch (Exception e) {
            throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
        }
    }

    // 如果为空,则返回空的结果
    if (resultList.isEmpty()) {
        return new RpcResult((Object) null);
    } else if (resultList.size() == 1) {
        // 如果只有一个结果,则返回该结果
        return resultList.iterator().next();
    }

    // 如果返回类型是void,也就是没有返回值,那么返回空结果
    if (returnType == void.class) {
        return new RpcResult((Object) null);
    }

    // 根据方法来合并,将调用返回结果的指定方法进行合并
    if (merger.startsWith(".")) {
        merger = merger.substring(1);
        Method method;
        try {
            // 获得方法
            method = returnType.getMethod(merger, returnType);
        } catch (NoSuchMethodException e) {
            throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + 
                    returnType.getClass().getName() + " ]");
        }

        // 有 Method ,进行合并
        if (!Modifier.isPublic(method.getModifiers())) {
            method.setAccessible(true);
        }
        // 从集合中移除
        result = resultList.remove(0).getValue();
        try {
            // 方法返回类型匹配,合并时,修改 result
            if (method.getReturnType() != void.class
                    && method.getReturnType().isAssignableFrom(result.getClass())) {
                for (Result r : resultList) {
                    result = method.invoke(result, r.getValue());
                }
            } else {
                // 方法返回类型不匹配,合并时,不修改 result
                for (Result r : resultList) {
                    method.invoke(result, r.getValue());
                }
            }
        } catch (Exception e) {
            throw new RpcException("Can not merge result: " + e.getMessage(), e);
        }
    } else {
        // 基于 Merger
        Merger resultMerger;
        // 如果是默认的方式
        if (ConfigUtils.isDefault(merger)) {
            // 获得该类型的合并方式
            resultMerger = MergerFactory.getMerger(returnType);
        } else {
            // 如果不是默认的,则配置中指定获得Merger的实现类
            resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
        }
        if (resultMerger != null) {
            List rets = new ArrayList(resultList.size());
            // 遍历返回结果
            for (Result r : resultList) {
                // 加入到rets
                rets.add(r.getValue());
            }
            // 合并
            result = resultMerger.merge(
                    rets.toArray((Object[]) Array.newInstance(returnType, 0)));
        } else {
            throw new RpcException("There is no merger to merge result.");
        }
    }
    // 返回结果
    return new RpcResult(result);
}

前面部分在讲获得调用的结果,后面部分是对结果的合并,合并有两种方式,根据配置不同可用分为基于方法的合并和基于merger的合并。

(三)MergerFactory

Merger 工厂类,获得指定类型的Merger 对象。

public class MergerFactory {

    /**
     * Merger 对象缓存
     */
    private static final ConcurrentMap, Merger> mergerCache =
            new ConcurrentHashMap, Merger>();

    /**
     * 获得指定类型的Merger对象
     * @param returnType
     * @param 
     * @return
     */
    public static  Merger getMerger(Class returnType) {
        Merger result;
        // 如果类型是集合
        if (returnType.isArray()) {
            // 获得类型
            Class type = returnType.getComponentType();
            // 从缓存中获得该类型的Merger对象
            result = mergerCache.get(type);
            // 如果为空,则
            if (result == null) {
                // 初始化所有的 Merger 扩展对象,到 mergerCache 缓存中。
                loadMergers();
                // 从集合中取出对应的Merger对象
                result = mergerCache.get(type);
            }
            // 如果结果为空,则直接返回ArrayMerger的单例
            if (result == null && !type.isPrimitive()) {
                result = ArrayMerger.INSTANCE;
            }
        } else {
            // 否则直接从mergerCache中取出
            result = mergerCache.get(returnType);
            // 如果为空
            if (result == null) {
                // 初始化所有的 Merger 扩展对象,到 mergerCache 缓存中。
                loadMergers();
                // 从集合中取出
                result = mergerCache.get(returnType);
            }
        }
        return result;
    }

    /**
     * 初始化所有的 Merger 扩展对象,到 mergerCache 缓存中。
     */
    static void loadMergers() {
        // 获得Merger所有的扩展对象名
        Set names = ExtensionLoader.getExtensionLoader(Merger.class)
                .getSupportedExtensions();
        // 遍历
        for (String name : names) {
            // 加载每一个扩展实现,然后放入缓存。
            Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name);
            mergerCache.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m);
        }
    }

}

逻辑比较简单。

(四)ArrayMerger

因为不同的类型有不同的Merger实现,我们可以来看看这个图片:

可以看到有好多好多,我就讲解其中的一种,偷懒一下,其他的麻烦有兴趣的去看看源码了。

public class ArrayMerger implements Merger {

    /**
     * 单例
     */
    public static final ArrayMerger INSTANCE = new ArrayMerger();

    @Override
    public Object[] merge(Object[]... others) {
        // 如果长度为0  则直接返回
        if (others.length == 0) {
            return null;
        }
        // 总长
        int totalLen = 0;
        // 遍历所有需要合并的对象
        for (int i = 0; i < others.length; i++) {
            Object item = others[i];
            // 如果为数组
            if (item != null && item.getClass().isArray()) {
                // 累加数组长度
                totalLen += Array.getLength(item);
            } else {
                throw new IllegalArgumentException((i + 1) + "th argument is not an array");
            }
        }

        if (totalLen == 0) {
            return null;
        }

        // 获得数组类型
        Class type = others[0].getClass().getComponentType();

        // 创建长度
        Object result = Array.newInstance(type, totalLen);
        int index = 0;
        // 遍历需要合并的对象
        for (Object array : others) {
            // 遍历每个数组中的数据
            for (int i = 0; i < Array.getLength(array); i++) {
                // 加入到最终结果中
                Array.set(result, index++, Array.get(array, i));
            }
        }
        return (Object[]) result;
    }

}

是不是很简单,就是循环合并就可以了。

后记
该部分相关的源码解析地址:https://github.com/CrazyHZM/i...

该文章讲解了集群中关于分组聚合实现的部分。接下来我将开始对集群模块关于路由部分进行讲解。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77431.html

相关文章

  • dubbo源码解析(四十八)异步化改造

    摘要:大揭秘异步化改造目标从源码的角度分析的新特性中对于异步化的改造原理。看源码解析四十六消费端发送请求过程讲到的十四的,在以前的逻辑会直接在方法中根据配置区分同步异步单向调用。改为关于可以参考源码解析十远程通信层的六。 2.7大揭秘——异步化改造 目标:从源码的角度分析2.7的新特性中对于异步化的改造原理。 前言 dubbo中提供了很多类型的协议,关于协议的系列可以查看下面的文章: du...

    lijinke666 评论0 收藏0
  • dubbo源码解析——cluster

    摘要:简单来说就是应对出错情况采取的策略。由于重试,重试次数过多时,带来时延。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。通常用于通知所有提供者更新缓存或日志等本地资源信息。 我们再来回顾一下官网的对于集群容错的架构设计图showImg(https://segmentfault.com/img/remote/1460000016972729?w=1000&h=502); Clus...

    seal_de 评论0 收藏0
  • dubbo源码解析(四十六)消费端发送请求过程

    摘要:可以参考源码解析二十四远程调用协议的八。十六的该类也是用了适配器模式,该类主要的作用就是增加了心跳功能,可以参考源码解析十远程通信层的四。二十的可以参考源码解析十七远程通信的一。 2.7大揭秘——消费端发送请求过程 目标:从源码的角度分析一个服务方法调用经历怎么样的磨难以后到达服务端。 前言 前一篇文章讲到的是引用服务的过程,引用服务无非就是创建出一个代理。供消费者调用服务的相关方法。...

    fish 评论0 收藏0
  • dubbo源码解析三十八)集群——LoadBalance

    摘要:集群目标介绍中集群的负载均衡,介绍下包的源码。源码分析一该类实现了接口,是负载均衡的抽象类,提供了权重计算的功能。四该类是负载均衡基于一致性的逻辑实现。 集群——LoadBalance 目标:介绍dubbo中集群的负载均衡,介绍dubbo-cluster下loadBalance包的源码。 前言 负载均衡,说的通俗点就是要一碗水端平。在这个时代,公平是很重要的,在网络请求的时候同样是这个...

    不知名网友 评论0 收藏0
  • dubbo源码解析十九)远程调用——开篇

    摘要:远程调用开篇目标介绍之后解读远程调用模块的内容如何编排介绍中的包结构设计以及最外层的的源码解析。十该类就是远程调用的上下文,贯穿着整个调用,例如调用,然后调用。十五该类是系统上下文,仅供内部使用。 远程调用——开篇 目标:介绍之后解读远程调用模块的内容如何编排、介绍dubbo-rpc-api中的包结构设计以及最外层的的源码解析。 前言 最近我面临着一个选择,因为dubbo 2.7.0-...

    jayce 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<