摘要:源码分析一该类继承了类,是协议实现的核心。属性默认端口号不支持协议的服务暴露,抛出异常可以看到不支持服务暴露。后记该部分相关的源码解析地址该文章讲解了远程调用中关于协议实现的部分,逻辑比较简单。
远程调用——redis协议
目标:介绍redis协议的设计和实现,介绍dubbo-rpc-redis的源码。前言
dubbo支持的redis协议是基于Redis的,Redis 是一个高效的 KV 存储服务器,跟memcached协议实现差不多,在dubbo中也没有涉及到关于redis协议的服务暴露,只有服务引用,因为在访问服务器时,Redis客户端可以在服务器上存储也可以获取。
源码分析 (一)RedisProtocol该类继承了AbstractProtocol类,是redis协议实现的核心。
1.属性/** * 默认端口号 */ public static final int DEFAULT_PORT = 6379;2.export
@Override publicExporter export(final Invoker invoker) throws RpcException { // 不支持redis协议的服务暴露,抛出异常 throw new UnsupportedOperationException("Unsupported export redis service. url: " + invoker.getUrl()); }
可以看到不支持服务暴露。
3.refer@Override publicInvoker refer(final Class type, final URL url) throws RpcException { try { // 实例化对象池 GenericObjectPoolConfig config = new GenericObjectPoolConfig(); // 如果 testOnBorrow 被设置,pool 会在 borrowObject 返回对象之前使用 PoolableObjectFactory的 validateObject 来验证这个对象是否有效 // 要是对象没通过验证,这个对象会被丢弃,然后重新选择一个新的对象。 config.setTestOnBorrow(url.getParameter("test.on.borrow", true)); // 如果 testOnReturn 被设置, pool 会在 returnObject 的时候通过 PoolableObjectFactory 的validateObject 方法验证对象 // 如果对象没通过验证,对象会被丢弃,不会被放到池中。 config.setTestOnReturn(url.getParameter("test.on.return", false)); // 指定空闲对象是否应该使用 PoolableObjectFactory 的 validateObject 校验,如果校验失败,这个对象会从对象池中被清除。 // 这个设置仅在 timeBetweenEvictionRunsMillis 被设置成正值( >0) 的时候才会生效。 config.setTestWhileIdle(url.getParameter("test.while.idle", false)); if (url.getParameter("max.idle", 0) > 0) // 控制一个pool最多有多少个状态为空闲的jedis实例。 config.setMaxIdle(url.getParameter("max.idle", 0)); if (url.getParameter("min.idle", 0) > 0) // 控制一个pool最少有多少个状态为空闲的jedis实例。 config.setMinIdle(url.getParameter("min.idle", 0)); if (url.getParameter("max.active", 0) > 0) // 控制一个pool最多有多少个jedis实例。 config.setMaxTotal(url.getParameter("max.active", 0)); if (url.getParameter("max.total", 0) > 0) config.setMaxTotal(url.getParameter("max.total", 0)); if (url.getParameter("max.wait", 0) > 0) //表示当引入一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException; config.setMaxWaitMillis(url.getParameter("max.wait", 0)); if (url.getParameter("num.tests.per.eviction.run", 0) > 0) // 设置驱逐线程每次检测对象的数量。这个设置仅在 timeBetweenEvictionRunsMillis 被设置成正值( >0)的时候才会生效。 config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0)); if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) // 指定驱逐线程的休眠时间。如果这个值不是正数( >0),不会有驱逐线程运行。 config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0)); if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) // 指定最小的空闲驱逐的时间间隔(空闲超过指定的时间的对象,会被清除掉)。 // 这个设置仅在 timeBetweenEvictionRunsMillis 被设置成正值( >0)的时候才会生效。 config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0)); // 创建redis连接池 final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT), url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isBlank(url.getPassword()) ? null : url.getPassword(), url.getParameter("db.index", 0)); // 获得值的过期时间 final int expiry = url.getParameter("expiry", 0); // 获得get命令 final String get = url.getParameter("get", "get"); // 获得set命令 final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set"); // 获得delete命令 final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete"); return new AbstractInvoker (type, url) { @Override protected Result doInvoke(Invocation invocation) throws Throwable { Jedis resource = null; try { resource = jedisPool.getResource(); // 如果是get命令 if (get.equals(invocation.getMethodName())) { // get 命令必须只有一个参数 if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // 获得值 byte[] value = resource.get(String.valueOf(invocation.getArguments()[0]).getBytes()); if (value == null) { return new RpcResult(); } // 反序列化 ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value)); return new RpcResult(oin.readObject()); } else if (set.equals(invocation.getMethodName())) { // 如果是set命令,参数长度必须是2 if (invocation.getArguments().length != 2) { throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes(); ByteArrayOutputStream output = new ByteArrayOutputStream(); // 对需要存入对值进行序列化 ObjectOutput value = getSerialization(url).serialize(url, output); value.writeObject(invocation.getArguments()[1]); // 存入值 resource.set(key, output.toByteArray()); // 设置该key过期时间,不能大于1000s if (expiry > 1000) { resource.expire(key, expiry / 1000); } return new RpcResult(); } else if (delete.equals(invocation.getMethodName())) { // 如果是删除命令,则参数长度必须是1 if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // 删除该值 resource.del(String.valueOf(invocation.getArguments()[0]).getBytes()); return new RpcResult(); } else { // 否则抛出该操作不支持的异常 throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service."); } } catch (Throwable t) { RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t); if (t instanceof TimeoutException || t instanceof SocketTimeoutException) { // 抛出超时异常 re.setCode(RpcException.TIMEOUT_EXCEPTION); } else if (t instanceof JedisConnectionException || t instanceof IOException) { // 抛出网络异常 re.setCode(RpcException.NETWORK_EXCEPTION); } else if (t instanceof JedisDataException) { // 抛出序列化异常 re.setCode(RpcException.SERIALIZATION_EXCEPTION); } throw re; } finally { if (resource != null) { try { jedisPool.returnResource(resource); } catch (Throwable t) { logger.warn("returnResource error: " + t.getMessage(), t); } } } } @Override public void destroy() { super.destroy(); try { // 关闭连接池 jedisPool.destroy(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } }; } catch (Throwable t) { throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t); } }
可以看到首先是对连接池的配置赋值,然后创建连接池后,根据redis的get、set、delete命令来进行相关操作。
后记该部分相关的源码解析地址:https://github.com/CrazyHZM/i...
该文章讲解了远程调用中关于redis协议实现的部分,逻辑比较简单。接下来我将开始对rpc模块关于rest协议部分进行讲解。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/73237.html
摘要:大揭秘异步化改造目标从源码的角度分析的新特性中对于异步化的改造原理。看源码解析四十六消费端发送请求过程讲到的十四的,在以前的逻辑会直接在方法中根据配置区分同步异步单向调用。改为关于可以参考源码解析十远程通信层的六。 2.7大揭秘——异步化改造 目标:从源码的角度分析2.7的新特性中对于异步化的改造原理。 前言 dubbo中提供了很多类型的协议,关于协议的系列可以查看下面的文章: du...
摘要:可以参考源码解析二十四远程调用协议的八。十六的该类也是用了适配器模式,该类主要的作用就是增加了心跳功能,可以参考源码解析十远程通信层的四。二十的可以参考源码解析十七远程通信的一。 2.7大揭秘——消费端发送请求过程 目标:从源码的角度分析一个服务方法调用经历怎么样的磨难以后到达服务端。 前言 前一篇文章讲到的是引用服务的过程,引用服务无非就是创建出一个代理。供消费者调用服务的相关方法。...
摘要:源码分析一该类继承,是协议实现的核心。属性默认端口号不支持服务暴露可以看到,服务暴露方法直接抛出异常。后记该部分相关的源码解析地址该文章讲解了远程调用中关于协议实现的部分,逻辑比较简单。 远程调用——memcached协议 目标:介绍memcached协议的设计和实现,介绍dubbo-rpc-memcached的源码。 前言 dubbo实现memcached协议是基于Memcached...
摘要:而存在的意义就是保证请求或响应对象可在线程池中被解码,解码完成后,就会分发到的。 2.7大揭秘——服务端处理请求过程 目标:从源码的角度分析服务端接收到请求后的一系列操作,最终把客户端需要的值返回。 前言 上一篇讲到了消费端发送请求的过程,该篇就要将服务端处理请求的过程。也就是当服务端收到请求数据包后的一系列处理以及如何返回最终结果。我们也知道消费端在发送请求的时候已经做了编码,所以我...
摘要:远程调用协议目标介绍远程调用中跟协议相关的设计和实现,介绍的源码。二该类继承了,是协议中独有的服务暴露者。八该类也是对的装饰,其中增强了调用次数多功能。 远程调用——dubbo协议 目标:介绍远程调用中跟dubbo协议相关的设计和实现,介绍dubbo-rpc-dubbo的源码。 前言 Dubbo 缺省协议采用单一长连接和 NIO 异步通讯,适合于小数据量大并发的服务调用,以及服务消费者...
阅读 706·2021-11-15 11:37
阅读 3324·2021-10-27 14:14
阅读 6115·2021-09-13 10:30
阅读 2969·2021-09-04 16:48
阅读 1939·2021-08-18 10:22
阅读 2136·2019-08-30 14:19
阅读 738·2019-08-30 10:54
阅读 1755·2019-08-29 18:40