摘要:前言此博客所述项目代码已在开源欢迎大家一起贡献点此进入最近一次写博客还是年底谢谢大家持久以来的关注本篇博文将会教大家如何从到搭建一个简单高效且拓展性强的框架什么是相信大家都或多或少使用过框架比如阿里的谷歌的的等等那么究竟什么是翻译成中文
Cool-Rpc 前言
此博客所述项目代码已在github开源,欢迎大家一起贡献!
点此进入:Cool-RPC
最近一次写博客还是17年底,谢谢大家持久以来的关注
本篇博文将会教大家如何从0到1,搭建一个简单、高效且拓展性强的rpc框架.
相信大家都或多或少使用过RPC框架,比如阿里的Dubbo、谷歌的grpc、Facebook的Thrift等等
那么究竟什么是rpc?
rpc翻译成中文叫做远程过程调用,通俗易懂点:将单应用架构成分布式系统架构后,多个系统间数据怎么交互,这就是rpc的职责.
从服务的角度来看,rpc分为服务提供者(provider)和服务消费者(consumer)两大类,中间会有一些共用java接口,叫做开放api接口
也就是说,接口服务实现类所处的地方叫做provider,接口服务调用类所处的地方叫consumer
因为处于分布式环境中,那consumer调用provider时,如何知道对方服务器的IP和开放端口呢?
这时需要一个组件叫做注册中心,consumer通过服务名后,去注册中心上查找该服务的IP+Port,拿到地址数据后,再去请求该地址的服务
如图:
Cool-Rpc技术简介此项目基于传输层(TCP/IP协议)进行通讯,传输层框架使用netty编写,github上会有mina版本
提供多套序列化框架,默认使用Protostuff序列化,可配置使用java序列化等
注册中心默认zookeeper,可配置使用redis(只要有节点数据存储和消息通知功能的组件即可)
consumer通过java动态代理的方式使用执行远程调用
将所要执行的类名,方法,参数等通知provider,之后provider拿着数据调用本地实现类,将处理后得到的结果通知给consumer
废话了那么多,开始上干货,建议大家从github克隆完整代码,本篇博文只讲重点代码
注册中心以api接口名为key,IP+Port为value,将数据持久化,以供消费者查询调用
以zookeeper为例:
为了更灵活地实现服务注册者和发现者,这里添加一个注册中心适配器
public abstract class ServiceCenterAdapter implements ServiceCenter{ String host; int port = 0; String passWord; ServiceCenterAdapter(){} ServiceCenterAdapter(String host){ this.host = host; } ServiceCenterAdapter(String host, int port) { this.host = host; this.port = port; } @Override public String discover(String serviceName) { return null; } @Override public void register(String serviceName, String serviceAddress) {} @Override public void setHost(String host){ this.host = host; }; @Override public void setPort(int port){ this.port = port; }; @Override public void setPassWord(String passWord){ this.passWord = passWord; }; //获取 IP:端口 @Override public String getAddress(){ if ("".equals(host) || host == null || port == 0){ throw new RuntimeException("the zookeeper host or port error"); } return host+":"+String.valueOf(port); }; }
zookeeper的服务注册(provider使用):
在实际项目中,需要构造此类,并注入相应的IP和端口,最后以bean的形式注入到IOC容器中
public class ZooKeeperServiceRegistry extends ServiceCenterAdapter { private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceRegistry.class); private ZkClient zkClient; { this.port = 2181; zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT); log.info("connect zookeeper"); } public ZooKeeperServiceRegistry(String zkHost) { super(zkHost); } public ZooKeeperServiceRegistry(String zkHost, int zkPort) { super(zkHost, zkPort); } // 注册服务 serviceName=接口名 serviceAddress=IP+Port @Override public void register(String serviceName, String serviceAddress) { // create cool node permanent String registryPath = CoolConstant.ZK_REGISTRY_PATH; if (!zkClient.exists(registryPath)) { zkClient.createPersistent(registryPath); log.info("create registry node: {}", registryPath); } // create service node permanent String servicePath = registryPath + "/" + serviceName; if (!zkClient.exists(servicePath)) { zkClient.createPersistent(servicePath); log.info("create service node: {}", servicePath); } // create service address node temp String addressPath = servicePath + "/address-"; String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress); log.info("create address node: {}", addressNode); } }
zookeeper的服务发现者(consumer使用):
同上,也需要配置相应的IP和端口,并以bean注入到项目ioc容器中
public class ZooKeeperServiceDiscovery extends ServiceCenterAdapter { private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceDiscovery.class); { super.port = 2181; } public ZooKeeperServiceDiscovery(){}; public ZooKeeperServiceDiscovery(String zkHost){ super(zkHost); } public ZooKeeperServiceDiscovery(String zkHost, int zkPort){ super(zkHost, zkPort); } // 服务发现 name=api接口名 @Override public String discover(String name) { ZkClient zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT); log.debug("connect zookeeper"); try { String servicePath = CoolConstant.ZK_REGISTRY_PATH + "/" + name; if (!zkClient.exists(servicePath)) { throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath)); } List服务端TCP处理器addressList = zkClient.getChildren(servicePath); if (addressList.size() == 0) { throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath)); } String address; int size = addressList.size(); if (size == 1) { address = addressList.get(0); log.debug("get only address node: {}", address); } else { address = addressList.get(ThreadLocalRandom.current().nextInt(size)); log.debug("get random address node: {}", address); } String addressPath = servicePath + "/" + address; return zkClient.readData(addressPath); } finally { zkClient.close(); } } }
此篇博文的TCP数据(包括编解码器、处理器)全部以netty编写
服务端的netty引导类:
public class CoolRpcServer implements ApplicationContextAware { private static Logger log = LoggerFactory.getLogger(CoolRpcServer.class); private Channel channel; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerBootstrap bootstrap; private HandlerInitializer handlerInitializer; private ServiceCenter serviceRegistry; private String serviceIP; private int port; public static MapservicesMap ; { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); bootstrap = new ServerBootstrap(); handlerInitializer = new HandlerInitializer(); servicesMap = new HashMap<>(16); } public CoolRpcServer(ServiceCenter serviceRegistry, String serviceIP, int port){ this.serviceRegistry = serviceRegistry; this.serviceIP = serviceIP; this.port = port; } /** * start and init tcp server if ioc contain is booting */ @SuppressWarnings("unchecked") public void initServer() throws InterruptedException { bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(handlerInitializer); bootstrap.option(ChannelOption.SO_BACKLOG, 128); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // the most send bytes ( 256KB ) bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 256); // the most receive bytes ( 2048KB ) bootstrap.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024 * 2); channel = bootstrap.bind(serviceIP,port).sync().channel(); if (servicesMap != null && servicesMap.size() > 0){ for (String beanName: servicesMap.keySet()){ serviceRegistry.register(beanName, serviceIP + ":" + String.valueOf(port)); log.info("register service name = {}", beanName); } } log.info("TCP server started successfully, port:{}", port); channel.closeFuture().sync(); } /** * close ioc contain and stop tcp server */ public void stopServer(){ if (channel != null && channel.isActive()) { channel.close(); } if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } log.info("TCP server stopped successfully, port: {}", port); } /** * scan Annotation of CoolService */ @Override public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map beans = ctx.getBeansWithAnnotation(CoolService.class); if (beans != null && beans.size()>0){ for (Object bean : beans.values()){ String name = bean.getClass().getAnnotation(CoolService.class).value().getName(); servicesMap.put(name, bean); } } } }
此项目的开放api接口实现类需要用@CoolService注解标识,服务端容器启动时,会扫描所有带有此注解的实现类,并注入到注册中心
服务端处理器(netty handler):
@ChannelHandler.Sharable public class CoolServerHandler extends ChannelInboundHandlerAdapter { private static Logger log = LoggerFactory.getLogger(CoolServerHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CoolResponse response = new CoolResponse(); CoolRequest request = (CoolRequest) msg; try { Object result = invoke(request); response.setRequestID(request.getRequestID()); response.setResult(result); } catch (Throwable error) { response.setError(error); } ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private Object invoke(CoolRequest request) throws Throwable{ if (request == null){ throw new Throwable("cool rpc request not found"); } String className = request.getClassName(); String methodName = request.getMethodName(); Object[] parameters = request.getParameters(); Object service = CoolRpcServer.servicesMap.get(className); if (service == null){ throw new Throwable("cool rpc service not exist"); } Class> serviceClass = service.getClass(); Class>[] parameterTypes = request.getParameterTypes(); FastClass fastClass = FastClass.create(serviceClass); FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes); return fastMethod.invoke(service, parameters); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("server caught exception", cause); ctx.close(); } }
将客户端传输过来的请求数据(类名,方法,参数)在本地以cglib的方式反射调用
调用成功后,将处理完毕的结果编码返回给客户端,并且关闭TCP连接
consumer只有api接口,并没有其实现类,所以我们可以用java动态代理的方式去自定义方法实现,代理的方法实现便是建立TCP握手连接,有provider来执行方法,将得到的结果返回给代理类,由此造成一种单凭接口就能调用实现类方法的假象
第一步: 使用java动态代理new出代理对象
public class CoolProxy { private static Logger log = LoggerFactory.getLogger(CoolProxy.class); private ServiceCenter serviceDiscovery; public CoolProxy(ServiceCenter serviceDiscovery){ this.serviceDiscovery = serviceDiscovery; } @SuppressWarnings("unchecked") publicT getInstance(Class cls){ return (T)Proxy.newProxyInstance(cls.getClassLoader(), new Class>[]{cls}, (proxy, method, args) -> { CoolRequest request = new CoolRequest(); request.setRequestID(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameters(args); request.setParameterTypes(method.getParameterTypes()); String[] addr = serviceDiscovery.discover(cls.getName()).split(":",2); CoolRpcClient client = new CoolRpcClient(addr[0], Integer.parseInt(addr[1])); CoolResponse response = client.send(request); if (response.getError()!=null){ throw response.getError(); } else { return response.getResult(); } }); } }
第二步: 在代理方法中,使用远程过程调用(rpc)
客户端引导类:
public class CoolRpcClient { private static Logger log = LoggerFactory.getLogger(CoolRpcClient.class); private CountDownLatch countDownLatch; private EventLoopGroup group; private Bootstrap bootstrap; private CoolResponse response; private String serviceIP; private int port; { response = new CoolResponse(); countDownLatch = new CountDownLatch(1); group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); } public CoolRpcClient(String serviceIP, int port){ this.serviceIP = serviceIP; this.port = port; } public CoolResponse send(CoolRequest request){ try { bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new CoolRpcDecoder(CoolResponse.class)) .addLast(new CoolRpcEncoder(CoolRequest.class)) .addLast(new CoolClientHandler(countDownLatch, response)); } }); bootstrap.option(ChannelOption.TCP_NODELAY, true); Channel channel = bootstrap.connect(serviceIP, port).sync().channel(); channel.writeAndFlush(request).sync(); countDownLatch.await(); channel.closeFuture().sync(); return response; } catch (Exception e){ e.printStackTrace(); return null; } finally { group.shutdownGracefully(); } } }
客户端处理器(handler):
@ChannelHandler.Sharable public class CoolClientHandler extends ChannelInboundHandlerAdapter { private static Logger log = LoggerFactory.getLogger(CoolClientHandler.class); private CountDownLatch latch; private CoolResponse response; public CoolClientHandler(CountDownLatch latch, CoolResponse response){ this.latch = latch; this.response = response; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CoolResponse enResponse = (CoolResponse) msg; this.response.sync(enResponse); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { latch.countDown(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("api caught exception", cause); ctx.close(); } }
最后使用CountDownLatch同步通知调用者,rpc调用完毕
结束语以上便是Cool-Rpc的简单讲解,如有更好的想法请联系我
热烈欢迎大家一起维护此项目Cool-RPC
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/76893.html
摘要:在文章微服务调用链追踪中心搭建一文中模拟出来的调用链就是一个远程调用的例子,只不过这篇文章里是通过这种同步调用方式,利用的是协议在应用层完成的,这种方法虽然奏效,但有时效率并不高。 showImg(https://segmentfault.com/img/remote/1460000014858219); 一、概述 RPC(Remote Procedure Call)即 远程过程调...
摘要:在文章微服务调用链追踪中心搭建一文中模拟出来的调用链就是一个远程调用的例子,只不过这篇文章里是通过这种同步调用方式,利用的是协议在应用层完成的,这种方法虽然奏效,但有时效率并不高。 showImg(https://segmentfault.com/img/remote/1460000014858219); 一、概述 RPC(Remote Procedure Call)即 远程过程调...
摘要:与文章框架实践之一文中实践的另一种通用框架能通过自动生成对应语言的接口类似,也能自动地生成和的存根,我们只需要一个命令就能快速搭建起运行环境。类似于之前对于框架的实践步骤,下面一一阐述。 showImg(https://segmentfault.com/img/remote/1460000014946557); 概述 gRPC是Google开源的通用高性能RPC框架,它支持的是使用P...
摘要:与文章框架实践之一文中实践的另一种通用框架能通过自动生成对应语言的接口类似,也能自动地生成和的存根,我们只需要一个命令就能快速搭建起运行环境。类似于之前对于框架的实践步骤,下面一一阐述。 showImg(https://segmentfault.com/img/remote/1460000014946557); 概述 gRPC是Google开源的通用高性能RPC框架,它支持的是使用P...
Github 地址:https://github.com/Snailclimb/springboot-integration-examples ,欢迎各位 Star。 目录: 使用 SpringBoot+Dubbo 搭建一个简单分布式服务 实战之前,先来看几个重要的概念 什么是分布式? 什么是 Duboo? Dubbo 架构 什么是 RPC? 为什么要用 Dubbo? 开始实战 1 ...
阅读 2751·2021-10-11 11:08
阅读 1471·2021-09-30 09:48
阅读 1021·2021-09-22 15:29
阅读 1018·2019-08-30 15:54
阅读 962·2019-08-29 15:19
阅读 517·2019-08-29 13:12
阅读 3141·2019-08-26 13:53
阅读 931·2019-08-26 13:28