摘要:目前支持多种注册中心。本编文章是分析使用作为注册中心,如何整合进行服务注册和订阅服务。
目前dubbo支持多种注册中心:Zookeeper、Redis、Simple、Multicast、Etcd3。
本编文章是分析使用Zookeeper作为注册中心,dubbo如何整合Zookeeper进行服务注册和订阅服务。
首先dubbo将服务注册到Zookeeper后,目录结构如下所示:(注册接口名:com.bob.dubbo.service.CityDubboService)
在consumer和provider服务启动的时候,去把自身URL格式化成字符串,然后注册到zookeeper相应节点下,作为临时节点,断开连接后,节点删除;consumer启动时,不仅会订阅服务,同时也会将自己的URL注册到zookeeper中;
ZookeeperRegistryZookeeperRegistry:dubbo与zookeeper交互主要的类,已下结合源码进行分析,先来看
doSubcribe()
这个方法主要是用于订阅服务,添加监听器,动态监听提供者列表变化:
@Override public void doSubscribe(final URL url, final NotifyListener listener) { try { // 处理所有service层发起的订阅,例如监控中心的订阅 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); ConcurrentMaplisteners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, (parentPath, currentChilds) -> { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } }); zkListener = listeners.get(listener); } zkClient.create(root, false); List services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } // 处理指定service层发起的订阅,例如服务消费者的订阅 } else { List urls = new ArrayList<>(); // 循环分类数组 , router, configurator, provider for (String path : toCategoriesPath(url)) { // 获得 url 对应的监听器集合 ConcurrentMap listeners = zkListeners.get(url); if (listeners == null) {// 不存在,进行创建 zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get(url); } // 获得 ChildListener 对象 ChildListener zkListener = listeners.get(listener); if (zkListener == null) {// 不存在子目录的监听器,进行创建 ChildListener 对象 // 订阅父级目录, 当有子节点发生变化时,触发此回调函数,回调listener中的notify()方法 listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))); zkListener = listeners.get(listener); } 创建Type节点,此节点为持久节点 zkClient.create(path, false); // 向 Zookeeper ,PATH 节点,发起订阅,返回此节点下的所有子元素 path : /根节点/接口全名/providers, 比如 : /dubbo/com.bob.service.CityService/providers List children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 首次全量数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener, 在这一步从连接Provider,实例化Invoker notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
register()
ZookeeperRegistry父类FailbackRegistry中的方法,用于将服务注册到zookeeper,具体代码如下:
@Override public void register(URL url) { // 调用父类AbstractRegistry中的register()方法,将url存储到注册集合中 super.register(url); // 如果之前这个url注册失败,则会从注册失败集合中删除 removeFailedRegistered(url); removeFailedUnregistered(url); try { // 像注册中心发送注册请求 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // 将url存入注册失败集合中,进行重试try() addFailedRegistered(url); } }
doRegister()
ZookeeperRegistry类中的方法
@Override public void doRegister(URL url) { try { // 通过zookeeper客户端向注册中心发送服务注册请求,在zookeeper下创建服务对应的节点 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
在介绍注册registry()方法的时候,解析到了FailbackRegistry类,接下来咱们来分析一下这个类的作用:
FailbackRegistry这个类是ZookeeperRegistry的父类,通过分析该类的结构,主要是用于服务的注册、订阅、重试,而服务具体的注册、订阅又在ZookeeperRegistry子类进行了实现,现在我们来分析重试这个功能,服务暴露和订阅的配置文件中一般会设置重试这个属性,如下所示:
上面是一个服务暴露的示例,设置了retries属性,表示重试的次数。接下来咱们就以注册重试进行分析(服务订阅是同样的原理):在注册registry()方法中(代码上面已提供),在异常catch{}代码块中有一个addFailedRegistered(url)方法,这个就是将注册失败的url添加到集合中,并创建一个重试的任务FailedRegisteredTask(url, this),代码如下:
private void addFailedRegistered(URL url) { // 先从集合中获取,如果存在,直接返回 FailedRegisteredTask oldOne = failedRegistered.get(url); if (oldOne != null) { return; } // 本地集合不存在,则创建重试定时任务,默认每隔5s执行 FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); oldOne = failedRegistered.putIfAbsent(url, newTask); if (oldOne == null) { // 将定时任务放置在HashedWheelTimer这个处理定时任务的容器,(HashedWheelTimer执行原理,可以自行查找资料,这里就不介绍) retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); } }
咱们下来看FailedRegisteredTask这个定时任务,有哪些东西,FailedRegisteredTask是AbstractRetryTask的子类,在执行new FailedRegisteredTask(url, this)代码时,其实调用的是父类构造函数,其中retryTimes表示重试的次数,在没有配置的情况下,默认重试三次:
AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) { if (url == null || StringUtils.isBlank(taskName)) { throw new IllegalArgumentException(); } this.url = url; this.registry = registry; this.taskName = taskName; cancel = false; this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); // 重试次数,默认情况下重试三次 this.retryTimes = url.getParameter(Constants.REGISTRY_RETRY_TIMES_KEY, Constants.DEFAULT_REGISTRY_RETRY_TIMES); }
在AbstractRetryTask类中有一个run()方法,在run()方法会根据XML配置文件中的retries属性值进行比较来进行重试,如果没有达到重试次数,则会调用doRetry(url, registry, timeout),而这个方法又在子类具体实现,这里我以注册FailedRegisteredTask举例:
@Override public void run(Timeout timeout) throws Exception { if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) { // other thread cancel this timeout or stop the timer. return; } // 重试次数与设置的retries进行比较,超过则不在进行重试 if (times > retryTimes) { // reach the most times of retry. logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times."); return; } if (logger.isInfoEnabled()) { logger.info(taskName + " : " + url); } try { // 调用子类实现,进行重试 doRetry(url, registry, timeout); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t); // reput this task when catch exception. reput(timeout, retryPeriod); } }
在子类FailedRegisteredTask中doRetry()方法具体实现:
public final class FailedRegisteredTask extends AbstractRetryTask { private static final String NAME = "retry register"; public FailedRegisteredTask(URL url, FailbackRegistry registry) { super(url, registry, NAME); } @Override protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { // 调用ZookeeperRegistry类中的doRegister()方法进行注册 registry.doRegister(url); registry.removeFailedRegisteredTask(url); } }
分析到这里,有个疑问:重试任务已经封装了,任务什么时候去执行,怎么执行的?其实在上面咱们就分析到过,就是使用了HashedWheelTimer,这个类是在ZookeeperRegistry类初始化的时候就会去初始化:
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { // 这个地方进行初始化的:初始化父类FailbackRegistry super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(state -> { if (state == StateListener.RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } }); }
public FailbackRegistry(URL url) { super(url); this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); // 创建HashedWheelTimer对象 retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); }
然后在addFailedRegistered()方法中有retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);这样的一条代码,这个就是执行任务的开始点:
@Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 开启轮询任务 start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
调用start()方法时,开启一个线程work去轮询存储到HashedWheelTimer容器的任务,然后调用任务中的run()方法,
public void start() { switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { // 开启work线程,执行work线程中的run()方法 workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // Wait until the startTime is initialized by the worker. while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
@Override public void run() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it"s not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); do { final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask); processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; transferTimeoutsToBuckets(); // 执行重试任务 bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket : wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (; ; ) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); }
void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // process all timeouts while (timeout != null) { // 轮询获取重试任务 HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { next = remove(timeout); if (timeout.deadline <= deadline) { // 执行重试任务 timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { timeout.remainingRounds--; } timeout = next; } }
public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { // 调用任务中的run()方法,(如:AbstractRetryTask任务中的run()方法,在去调用子类FailedRegisteredTask中的doRetry()方法进行重试注册) task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ".", t); } } }
在上面对于HashedWheelTimer的具体实现原理,并没有进行详细的进行分析,如果想了解的和学习的话,可以自行查找资料。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/73033.html
摘要:是目前非常流行的分布式服务技术,很多公司都在用。空闲之余,搭了个,分享给大家。本文下载下载是服务的注册中心,下载后进入到安装目录。双击即可启动注册中心服务。 dubbo是目前非常流行的分布式服务技术,很多公司都在用。空闲之余,搭了个helloworld,分享给大家。本文demo下载1.下载 zookeeperzookeeper是服务的注册中心,下载后进入到安装目录G:bakCenter...
摘要:启动容器,加载,运行服务提供者。服务提供者在启动时,在注册中心发布注册自己提供的服务。注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。 一 为什么需要 dubbo 很多时候,其实我们使用这个技术的时候,可能都是因为项目需要,所以,我们就用了,但是,至于为什么我们需要用到这个技术,可能自身并不是很了解的,但是,其实了解技术的来由及背景知识,对...
Github 地址:https://github.com/Snailclimb/springboot-integration-examples ,欢迎各位 Star。 目录: 使用 SpringBoot+Dubbo 搭建一个简单分布式服务 实战之前,先来看几个重要的概念 什么是分布式? 什么是 Duboo? Dubbo 架构 什么是 RPC? 为什么要用 Dubbo? 开始实战 1 ...
摘要:构建服务接口创建一个简单的项目,并在下面定义一个抽象接口,比如构建服务接口提供方第一步创建一个项目,在中引入第一步中构建的包以及对和的依赖,比如第一步中构建的包这里需要注意两点必须包含包,不然启动会报错。 很早以前,在刚开始搞Spring Cloud基础教程的时候,写过这样一篇文章:《微服务架构的基础框架选择:Spring Cloud还是Dubbo?》,可能不少读者也都看过。之后也就一...
摘要:在微服务架构中,注册中心是核心的基础服务之一。在微服务架构流行之前,注册中心就已经开始出现在分布式架构的系统中。服务提供者注册到注册中心,服务消费者到注册中心订阅,同时,注册中心中的变更也会通知服务消费者。 在微服务架构中,注册中心是核心的基础服务之一。在微服务架构流行之前,注册中心就已经开始出现在分布式架构的系统中。Dubbo是一个在国内比较流行的分布式框架,被大量的中小型互联网公司...
阅读 2518·2019-08-30 15:53
阅读 2850·2019-08-29 16:20
阅读 1042·2019-08-29 15:10
阅读 1001·2019-08-26 10:58
阅读 2166·2019-08-26 10:49
阅读 601·2019-08-26 10:21
阅读 681·2019-08-23 18:30
阅读 1616·2019-08-23 15:58