摘要:项目版本源码在上一博文中跟大家讲了的实现思路思路毕竟只是思路那么这篇就带着源码给大家讲解下实现过程中的各个具体问题读懂本篇需要的基本知识若尚未清晰请自行了解后再阅读本文动态代理框架的基本使用的基本配置最终项目的使用如下调用端代码及配置测试类
项目1.0版本源码
https://github.com/wephone/Me...
在上一博文中 跟大家讲了RPC的实现思路 思路毕竟只是思路 那么这篇就带着源码给大家讲解下实现过程中的各个具体问题
读懂本篇需要的基本知识 若尚未清晰请自行了解后再阅读本文java动态代理
netty框架的基本使用
spring的基本配置
最终项目的使用如下/** *调用端代码及spring配置 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"file:src/test/java/rpcTest/ClientContext.xml"}) public class Client { @Test public void start(){ Service service= (Service) RPC.call(Service.class); System.out.println("测试Integer,Double类型传参与返回String对象:"+service.stringMethodIntegerArgsTest(233,666.66)); //输出string233666.66 } } /** *Service抽象及其实现 *调用与实现端共同依赖Service */ public interface Service { String stringMethodIntegerArgsTest(Integer a,Double b); } /** * ServiceImpl实现端对接口的具体实现 */ public class ServiceImpl implements Service { @Override public String stringMethodIntegerArgsTest(Integer a, Double b) { return "String"+a+b; } }
1.0版本分3个包
Client 调用端
Server 实现端
Core 核心方法
首先看这句代码调用端只需如此调用
定义接口 传入接口类类型 后面调用的接口内的方法 全部是由实现端实现
Service service= (Service) RPC.call(Service.class);
这句的作用其实就是生成调用端的动态代理
/** * 暴露调用端使用的静态方法 为抽象接口生成动态代理对象 * TODO 考虑后面优化不在使用时仍需强转 * @param cls 抽象接口的类类型 * @return 接口生成的动态代理对象 */ public static Object call(Class cls){ RPCProxyHandler handler=new RPCProxyHandler(); Object proxyObj=Proxy.newProxyInstance(cls.getClassLoader(),new Class>[]{cls},handler); return proxyObj; }
RPCProxyHandler为动态代理的方法被调用后的回调方法 每个方法被调用时都会执行这个invoke
/** * 代理抽象接口调用的方法 * 发送方法信息给服务端 加锁等待服务端返回 * @param proxy * @param method * @param args * @return * @throws Throwable */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RPCRequest request=new RPCRequest(); request.setRequestID(buildRequestID(method.getName())); request.setClassName(method.getDeclaringClass().getName());//返回表示声明由此 Method 对象表示的方法的类或接口的Class对象 request.setMethodName(method.getName()); // request.setParameterTypes(method.getParameterTypes());//返回形参类型 request.setParameters(args);//输入的实参 RPCRequestNet.requestLockMap.put(request.getRequestID(),request); RPCRequestNet.connect().send(request); //调用用结束后移除对应的condition映射关系 RPCRequestNet.requestLockMap.remove(request.getRequestID()); return request.getResult();//目标方法的返回结果 }
也就是收集对应调用的接口的信息 然后send给实现端
那么这个requestLockMap又是作何作用的呢
由于我们的网络调用都是异步的
但是RPC调用都要做到同步 等待这个远程调用方法完全返回后再继续执行
所以将每个请求的request对象作为对象锁 每个请求发送后加锁 等到网络异步调用返回后再释放所
生成每个请求的ID 这里我用随机数加时间戳
将请求ID和请求对象维护在静态全局的一个map中 实现端通过ID来对应是哪个请求
异步调用返回后 通过ID notify唤醒对应请求对象的线程
netty异步返回的调用 释放对象锁
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String responseJson= (String) msg; RPCResponse response= (RPCResponse) RPC.responseDecode(responseJson); synchronized (RPCRequestNet.requestLockMap.get(response.getRequestID())) { //唤醒在该对象锁上wait的线程 RPCRequest request= (RPCRequest) RPCRequestNet.requestLockMap.get(response.getRequestID()); request.setResult(response.getResult()); request.notifyAll(); } }
接下来是RPCRequestNet.connect().send(request);方法
connect方法其实是单例模式返回RPCRequestNet实例
RPCRequestNet构造方法是使用netty对实现端进行TCP链接
send方法如下
try { //判断连接是否已完成 只在连接启动时会产生阻塞 if (RPCRequestHandler.channelCtx==null){ connectlock.lock(); //挂起等待连接成功 System.out.println("正在等待连接实现端"); connectCondition.await(); connectlock.unlock(); } //编解码对象为json 发送请求 String requestJson= null; try { requestJson = RPC.requestEncode(request); } catch (JsonProcessingException e) { e.printStackTrace(); } ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes()); RPCRequestHandler.channelCtx.writeAndFlush(requestBuf); System.out.println("调用"+request.getRequestID()+"已发送"); //挂起等待实现端处理完毕返回 TODO 后续配置超时时间 synchronized (request) { //放弃对象锁 并阻塞等待notify request.wait(); } System.out.println("调用"+request.getRequestID()+"接收完毕"); } catch (InterruptedException e) { e.printStackTrace(); }
condition和lock同样是为了同步等待异步IO返回用的
send方法基本是编解码json后发送给实现端
/** *实现端代码及spring配置 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"file:src/test/java/rpcTest/ServerContext.xml"}) public class Server { @Test public void start(){ //启动spring后才可启动 防止容器尚未加载完毕 RPC.start(); } }
出了配置spring之外 实现端就一句 RPC.start()
其实就是启动netty服务器
服务端的处理客户端信息回调如下
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { String requestJson= (String) msg; System.out.println("receive request:"+requestJson); RPCRequest request= RPC.requestDeocde(requestJson); Object result=InvokeServiceUtil.invoke(request); //netty的write方法并没有直接写入通道(为避免多次唤醒多路复用选择器) //而是把待发送的消息放到缓冲数组中,flush方法再全部写到通道中 // ctx.write(resp); //记得加分隔符 不然客户端一直不会处理 RPCResponse response=new RPCResponse(); response.setRequestID(request.getRequestID()); response.setResult(result); String respStr=RPC.responseEncode(response); ByteBuf responseBuf= Unpooled.copiedBuffer(respStr.getBytes()); ctx.writeAndFlush(responseBuf); }
主要是编解码json 反射对应的方法 我们看看反射的工具类
/** * 反射调用相应实现类并结果 * @param request * @return */ public static Object invoke(RPCRequest request){ Object result=null;//内部变量必须赋值 全局变量才不用 //实现类名 String implClassName= RPC.getServerConfig().getServerImplMap().get(request.getClassName()); try { Class implClass=Class.forName(implClassName); Object[] parameters=request.getParameters(); int parameterNums=request.getParameters().length; Class[] parameterTypes=new Class[parameterNums]; for (int i = 0; i解析Parameters getClass获取他们的类类型 反射调用对应的方法
这里需要注意一个点本文最初采用Gson处理json 但gson默认会把int类型转为double类型 例如2变为2.0 不适用本场景 我也不想去专门适配
所以换用了jackson
常见json处理框架 反序列化为对象时 int,long等基本类型都会变成他们的包装类Integer Long
所以本例程中 远程调度接口方法的形参不可以使用int等基本类型
否则method.invoke(implObj,parameters);会找不到对应的方法报错
因为parameters已经是包装类了 而method还是int这些基本类 所以找不到对应方法
最后是借助spring配置基础配置
我写了两个类 ServerConfig ClientConfig 作为调用端和服务端的配置
只需在spring中配置这两个bean 并启动IOC容器即可调用端
实现端
最后有个小问题我们的框架是作为一个依赖包引入的 我们不可能在我们的框架中读取对应的spring xml
这样完全是去了框架的灵活性
那我们怎么在运行过程中获得我们所处于的IOC容器 已获得我们的正确配置信息呢
答案是spring提供的ApplicationContextAware接口/** * Created by wephone on 17-12-26. */ public class ClientConfig implements ApplicationContextAware { private String host; private int port; //调用超时时间 private long overtime; public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public long getOvertime() { return overtime; } public void setOvertime(long overtime) { this.overtime = overtime; } /** * 加载Spring配置文件时,如果Spring配置文件中所定义的Bean类 * 如果该类实现了ApplicationContextAware接口 * 那么在加载Spring配置文件时,会自动调用ApplicationContextAware接口中的 * @param applicationContext * @throws BeansException */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RPC.clientContext=applicationContext; } }这样我们在RPC类内部就维护了一个静态IOC容器的context
只需如此获取配置
RPC.getServerConfig().getPort()public static ServerConfig getServerConfig(){ return serverContext.getBean(ServerConfig.class); }就这样 这个RPC框架的核心部分 已经讲述完毕了本例程仅为1.0版本
后续博客中 会加入异常处理 zookeeper支持 负载均衡策略等
博客:zookeeper支持
欢迎持续关注 欢迎star 提issue
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/68280.html
摘要:每个都可以通过其路径唯一标识,同时每个节点还可以存储少量数据。监听机制,监听某个当该发生变化时,会回调该,但是这个是一次性的,下次需要监听时还得再注册一次。 前面的文章中 我用netty实现了一个简单的一对一的RPC 11个类实现简单java rpc 接下来的文章中 我将使用zookeeper作为rpc调用的分布式注册中心 从而实现多对多(多个调用者,多个提供者)的rpc调用,负载均...
摘要:等之所以支持跨语言,是因为他们自己定义了一套结构化数据存储格式,如的,用于编解码对象,作为各个语言通信的中间协议。 前段时间觉得自己一直用别人的框架,站在巨人的肩膀上,也该自己造造轮子了 一时兴起 就着手写起了RPC框架 这里写了系列博客拿给大家分享下 这篇是开篇的思路篇 项目最终的代码放在了我的github上https://github.com/wephone/Me... 欢迎sta...
摘要:支持相关规范和标准,包括同上。支持多种传输协议和协议绑定数据绑定。构建端还有其服务实现,接口使用注解,标明是一个远程服务接口。然后编写一个的启动程序,并运行,我想你会成功的因为我看到了下图是一种跨平台的技术协议。 本博客 猫叔的博客,转载请申明出处 学习系列 RPC框架是啥? RPC框架是啥之Java自带RPC实现,RMI框架入门 Apache CXF一款WebService RP...
阅读 1532·2023-04-26 01:54
阅读 1563·2021-09-30 09:55
阅读 2589·2021-09-22 16:05
阅读 1721·2021-07-25 21:37
阅读 2587·2019-08-29 18:45
阅读 1862·2019-08-29 16:44
阅读 1852·2019-08-29 12:34
阅读 1316·2019-08-23 14:02