摘要:每个消息都会被一个线程消费,同时最大并发量为。然后提交一个任务到线程池中,这个任务的内容是从等待队列中取出一个,如果等待队列为空,则删除这个等待队列的。小结本文分析了的久经生产考验的核心组件线程池。
本文首发于泊浮目的专栏:https://segmentfault.com/blog...前言
在ZStack中,最基本的执行单位不仅仅是一个函数,也可以是一个任务(Task。其本质实现了Java的Callable接口)。通过大小合理的线程池调度来并行的消费这些任务,使ZStack这个Iaas软件有条不紊运行在大型的数据中心里。
对线程池不太了解的同学可以先看我的一篇博客:Java多线程笔记(三):线程池演示代码
在这里,将以ZStack中ThreadFacade最常用的方法为例进行演示。
syncSubmit提交同步任务,线程将会等结果完成后才继续下一个任务。
这里先参考ZStack中ApiMediatorImpl ,其中有一段用于API消息调度的逻辑。
@Override public void handleMessage(final Message msg) { thdf.syncSubmit(new SyncTask
每个API消息都会被一个线程消费,同时最大并发量为5(apiWorkerNum=5)。每个线程都会等着API消息的回复,等到回复后便给用户。
chainSubmit提交异步任务,这里的任务执行后将会执行队列中的下一个任务,不会等待结果。
参考VmInstanceBase关于虚拟机启动、重启、暂停相关的代码:
//暂停虚拟机 protected void handle(final APIStopVmInstanceMsg msg) { thdf.chainSubmit(new ChainTask(msg) { @Override public String getName() { return String.format("stop-vm-%s", self.getUuid()); } @Override public String getSyncSignature() { return syncThreadName; } @Override public void run(SyncTaskChain chain) { stopVm(msg, chain); } }); } //重启虚拟机 protected void handle(final APIRebootVmInstanceMsg msg) { thdf.chainSubmit(new ChainTask(msg) { @Override public String getName() { return String.format("reboot-vm-%s", self.getUuid()); } @Override public String getSyncSignature() { return syncThreadName; } @Override public void run(SyncTaskChain chain) { rebootVm(msg, chain); } }); } //启动虚拟机 protected void handle(final APIStartVmInstanceMsg msg) { thdf.chainSubmit(new ChainTask(msg) { @Override public String getName() { return String.format("start-vm-%s", self.getUuid()); } @Override public String getSyncSignature() { return syncThreadName; } @Override public void run(SyncTaskChain chain) { startVm(msg, chain); } }); }通用特性
getSyncSignature则指定了其队列的key,这个任务队列本质一个Map。根据相同的k,将任务作为v按照顺序放入map执行。单从这里的业务逻辑来看,可以有效避免虚拟机的状态混乱。
chainTask的默认并发度为1,这意味着它是同步的。在稍后的源码解析中我们将会看到。它的实现
先从接口ThreadFacade了解一下方法签名:
public interface ThreadFacade extends Component {Future submit(Task task);//提交一个任务 Future syncSubmit(SyncTask task); //提交一个有返回值的任务 Future chainSubmit(ChainTask task); //提交一个没有返回值的任务 Future submitPeriodicTask(PeriodicTask task, long delay); //提交一个周期性任务,将在一定时间后执行 Future submitPeriodicTask(PeriodicTask task); //提交一个周期性任务 Future submitCancelablePeriodicTask(CancelablePeriodicTask task); //提交一个可以取消的周期性任务 Future submitCancelablePeriodicTask(CancelablePeriodicTask task, long delay); //提交一个可以取消的周期性任务,将在一定时间后执行 void registerHook(ThreadAroundHook hook); //注册钩子 void unregisterHook(ThreadAroundHook hook); //取消钩子 ThreadFacadeImpl.TimeoutTaskReceipt submitTimeoutTask(Runnable task, TimeUnit unit, long delay); //提交一个过了一定时间就算超时的任务 void submitTimerTask(TimerTask task, TimeUnit unit, long delay); //提交一个timer任务 }
以及几个方法逻辑实现类DispatchQueueImpl中的几个成员变量。
private static final CLogger logger = Utils.getLogger(DispatchQueueImpl.class); @Autowired ThreadFacade _threadFacade; private final HashMapsyncTasks = new HashMap (); private final HashMap chainTasks = new HashMap (); private static final CLogger _logger = CLoggerImpl.getLogger(DispatchQueueImpl.class); public static final String DUMP_TASK_DEBUG_SINGAL = "DumpTaskQueue";
关键就是syncTasks(同步队列)和chainTasks(异步队列) ,用于存储两种类型的任务队列。
因此当我们提交chainTask时,要注意记得显示的调用next方法,避免后面的任务调度不到。
接着,我们从最常用的几个方法开始看它的代码。
chainSubmit方法从ThreadFacadeImpl作为入口
@Override public FuturechainSubmit(ChainTask task) { return dpq.chainSubmit(task); }
DispatchQueue中的逻辑
//公有方法,即入口之一 @Override public FuturechainSubmit(ChainTask task) { return doChainSyncSubmit(task); }
//内部逻辑 privateFuture doChainSyncSubmit(final ChainTask task) { assert task.getSyncSignature() != null : "How can you submit a chain task without sync signature ???"; DebugUtils.Assert(task.getSyncLevel() >= 1, String.format("getSyncLevel() must return 1 at least ")); synchronized (chainTasks) { final String signature = task.getSyncSignature(); ChainTaskQueueWrapper wrapper = chainTasks.get(signature); if (wrapper == null) { wrapper = new ChainTaskQueueWrapper(); chainTasks.put(signature, wrapper); } ChainFuture cf = new ChainFuture(task); wrapper.addTask(cf); wrapper.startThreadIfNeeded(); return cf; } }
这段逻辑大致为:
断言syncSignature不为空,并且必须并行度必须大于等于1。因为1会被做成队列,由一个线程完成这些任务。而1以上则指定了可以有几个线程来完成同一个signature的任务。
加锁HashMap
接下来就是startThreadIfNeeded。所谓ifNeeded就是指给这个队列的线程数尚有空余。然后提交一个任务到线程池中,这个任务的内容是:从等待队列中取出一个Feture,如果等待队列为空,则删除这个等待队列的Map。
private class ChainTaskQueueWrapper { LinkedList pendingQueue = new LinkedList(); final LinkedList runningQueue = new LinkedList(); AtomicInteger counter = new AtomicInteger(0); int maxThreadNum = -1; String syncSignature; void addTask(ChainFuture task) { pendingQueue.offer(task); if (maxThreadNum == -1) { maxThreadNum = task.getSyncLevel(); } if (syncSignature == null) { syncSignature = task.getSyncSignature(); } } void startThreadIfNeeded() { //如果运行线程数量已经大于等于限制,不start if (counter.get() >= maxThreadNum) { return; } counter.incrementAndGet(); _threadFacade.submit(new TasksyncSubmit方法() { @Override public String getName() { return "sync-chain-thread"; } // start a new thread every time to avoid stack overflow @AsyncThread private void runQueue() { ChainFuture cf; synchronized (chainTasks) { // remove from pending queue and add to running queue later cf = (ChainFuture) pendingQueue.poll(); if (cf == null) { if (counter.decrementAndGet() == 0) { //并且线程只有一个(跑完就没了),则将相关的signature队列移除,避免占用内存 chainTasks.remove(syncSignature); } //如果为空,则没有任务,返回 return; } } synchronized (runningQueue) { // add to running queue runningQueue.offer(cf); } //完成以后将任务挪出运行队列 cf.run(new SyncTaskChain() { @Override public void next() { synchronized (runningQueue) { runningQueue.remove(cf); } runQueue(); } }); } //这个方法将会被线程池调用,作为入口 @Override public Void call() throws Exception { runQueue(); return null; } }); } }
syncSubmit的内部逻辑与我们之前分析的chainSubmit极为相似,只是放入了不同的队列中。
同样,也是从ThreadFacadeImpl作为入口
@Override publicFuture syncSubmit(SyncTask task) { return dpq.syncSubmit(task); }
然后是DispatchQueue中的实现
@Override publicFuture syncSubmit(SyncTask task) { if (task.getSyncLevel() <= 0) { return _threadFacade.submit(task); } else { return doSyncSubmit(task); } }
内部逻辑-私有方法
privatesubmitPeriodicTaskFuture doSyncSubmit(final SyncTask syncTask) { assert syncTask.getSyncSignature() != null : "How can you submit a sync task without sync signature ???"; SyncTaskFuture f; synchronized (syncTasks) { SyncTaskQueueWrapper wrapper = syncTasks.get(syncTask.getSyncSignature()); if (wrapper == null) { wrapper = new SyncTaskQueueWrapper(); //放入syncTasks队列。 syncTasks.put(syncTask.getSyncSignature(), wrapper); } f = new SyncTaskFuture(syncTask); wrapper.addTask(f); wrapper.startThreadIfNeeded(); } return f; }
提交一个定时任务本质上是通过了线程池的scheduleAtFixedRate来实现。这个方法用于对任务进行周期性调度,任务调度的频率是一定的,它以上一个任务开始执行时间为起点,之后的period时间后调度下一次任务。如果任务的执行时间大于调度时间,那么任务就会在上一个任务结束后,立即被调用。
调用这个方法时将会把任务放入定时任务队列。当任务出现异常时,将会取消这个Futrue,并且挪出队列。
public FuturesubmitCancelablePeriodicTasksubmitPeriodicTask(final PeriodicTask task, long delay) { assert task.getInterval() != 0; assert task.getTimeUnit() != null; ScheduledFuture ret = (ScheduledFuture ) _pool.scheduleAtFixedRate(new Runnable() { public void run() { try { task.run(); } catch (Throwable e) { _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e); final Map > periodicTasks = getPeriodicTasks(); final ScheduledFuture> ft = periodicTasks.get(task); if (ft != null) { ft.cancel(true); periodicTasks.remove(task); } else { _logger.warn("Not found feature for task " + task.getName() + ", the exception happened too soon, will try to cancel the task next time the exception happens"); } } } }, delay, task.getInterval(), task.getTimeUnit()); _periodicTasks.put(task, ret); return ret; }
而submitCancelablePeriodicTask则是会在执行时检测ScheduledFuture是否被要求cancel,如果有要求则取消。
@Override public Future初始化操作submitCancelablePeriodicTask(final CancelablePeriodicTask task, long delay) { ScheduledFuture ret = (ScheduledFuture ) _pool.scheduleAtFixedRate(new Runnable() { private void cancelTask() { ScheduledFuture> ft = cancelablePeriodicTasks.get(task); if (ft != null) { ft.cancel(true); cancelablePeriodicTasks.remove(task); } else { _logger.warn("cannot find feature for task " + task.getName() + ", the exception happened too soon, will try to cancel the task next time the exception happens"); } } public void run() { try { boolean cancel = task.run(); if (cancel) { cancelTask(); } } catch (Throwable e) { _logger.warn("An unhandled exception happened during executing periodic task: " + task.getName() + ", cancel it", e); cancelTask(); } } }, delay, task.getInterval(), task.getTimeUnit()); cancelablePeriodicTasks.put(task, ret); return ret; }
不同与通常的ZStack组件,它虽然实现了Component接口。但是其start中的逻辑并不全面,初始化逻辑是基于spring bean的生命周期来做的。见ThreadFacade。
再让回头看看ThreadFacadeImpl的init与destory操作。
//init 操作 public void init() { //根据全局配置读入线程池最大线程数量 totalThreadNum = ThreadGlobalProperty.MAX_THREAD_NUM; if (totalThreadNum < 10) { _logger.warn(String.format("ThreadFacade.maxThreadNum is configured to %s, which is too small for running zstack. Change it to 10", ThreadGlobalProperty.MAX_THREAD_NUM)); totalThreadNum = 10; } // 构建一个支持延时任务的线程池 _pool = new ScheduledThreadPoolExecutorExt(totalThreadNum, this, this); _logger.debug(String.format("create ThreadFacade with max thread number:%s", totalThreadNum)); //构建一个DispatchQueue dpq = new DispatchQueueImpl(); jmxf.registerBean("ThreadFacade", this); }
//destory public void destroy() { _pool.shutdownNow(); }
看了这里可能大家会有疑问,这种关闭方式未免关于暴力(执行任务的线程会全部被中断)。在此之前,我们曾提到过,它实现了Component接口。这个接口分别有一个start和stop方法,使一个组件的生命周期能够方便的在ZStack中注册相应的钩子。
//stop 方法 @Override public boolean stop() { _pool.shutdown(); timerPool.stop(); return true; }线程工厂
ThreadFacadeImpl同时也实现了ThreadFactory,可以让线程在创建时做一些操作。
@Override public Thread newThread(Runnable arg0) { return new Thread(arg0, "zs-thread-" + String.valueOf(seqNum.getAndIncrement())); }
在这里可以看到ZStack为每一个新的线程赋予了一个名字。
线程池ZStack对JDK中的线程池进行了一定的扩展,对一个任务执行前后都有相应的钩子函数,同时也开放注册钩子。
package org.zstack.core.thread; import org.apache.logging.log4j.ThreadContext; import org.zstack.utils.logging.CLogger; import org.zstack.utils.logging.CLoggerImpl; import java.util.ArrayList; import java.util.List; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; public class ScheduledThreadPoolExecutorExt extends ScheduledThreadPoolExecutor { private static final CLogger _logger =CLoggerImpl.getLogger(ScheduledThreadPoolExecutorExt.class); List_hooks = new ArrayList (8); public ScheduledThreadPoolExecutorExt(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, threadFactory, handler); this.setMaximumPoolSize(corePoolSize); } public void registerHook(ThreadAroundHook hook) { synchronized (_hooks) { _hooks.add(hook); } } public void unregisterHook(ThreadAroundHook hook) { synchronized (_hooks) { _hooks.remove(hook); } } @Override protected void beforeExecute(Thread t, Runnable r) { ThreadContext.clearMap(); ThreadContext.clearStack(); ThreadAroundHook debugHook = null; List tmpHooks; synchronized (_hooks) { tmpHooks = new ArrayList (_hooks); } for (ThreadAroundHook hook : tmpHooks) { debugHook = hook; try { hook.beforeExecute(t, r); } catch (Exception e) { _logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e); } } } @Override protected void afterExecute(Runnable r, Throwable t) { ThreadContext.clearMap(); ThreadContext.clearStack(); ThreadAroundHook debugHook = null; List tmpHooks; synchronized (_hooks) { tmpHooks = new ArrayList (_hooks); } for (ThreadAroundHook hook : tmpHooks) { debugHook = hook; try { hook.afterExecute(r, t); } catch (Exception e) { _logger.warn("Unhandle exception happend during executing ThreadAroundHook: " + debugHook.getClass().getCanonicalName(), e); } } } }
另外,ScheduledThreadPoolExecutorExt是继承自ScheduledThreadPoolExecutor。本质上是一个任务调度线程池,用的工作队列也是一个延时工作队列。
小结本文分析了ZStack的久经生产考验的核心组件——线程池。通过线程池,使并行编程变得不再那么复杂。
当然,其中也有一些可以改进的地方:
一些加锁的地方(synchronized),可以通过使用并发容器解决。这样可以有效提升吞吐量,节省因为竞争锁而导致的开销。
在提交大量任务的情况下,HashMap会因为扩容而导致性能耗损。可以考虑换一种Map或在不同的策略下使HashMap的初始大小有个较为合理的设置。
队列是无界的。在大量任务请求时,会对内存造成极大的负担。
任务队列无超时逻辑判断。ZStack中的调用绝大多数都是由MQ完成,每一个msg有着对应的超时时间。但是每一个任务却没有超时判定,这意味着一个任务执行时间过长时,后面的任务有可能进入了超时状态,而却没有挪出队列,配合之前提到的无界队列,就是一场潜在的灾难。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/70783.html
摘要:本文首发于泊浮目的专栏在语言中,有一个关键字叫做其作用是在函数前执行。一般有两种用法在该函数抛出异常时执行。在该函数返回前执行。这里的放入来自系统启动时利用反射所做的一个行为。因此并不会影响使用时的性能。 本文首发于泊浮目的专栏:https://segmentfault.com/blog... 在Go语言中,有一个关键字叫做defer——其作用是在函数return前执行。在ZStac...
摘要:本文首发于泊浮目的专栏在语言中,有一个关键字叫做其作用是在函数前执行。一般有两种用法在该函数抛出异常时执行。在该函数返回前执行。这里的放入来自系统启动时利用反射所做的一个行为。因此并不会影响使用时的性能。 本文首发于泊浮目的专栏:https://segmentfault.com/blog... 在Go语言中,有一个关键字叫做defer——其作用是在函数return前执行。在ZStac...
摘要:因为这个状态下,是交给一个线程在执行的,见源码剖析之核心库鉴赏中的分析。并且允许等行为。上面提到过,允许运行暂停取消等行为。维护和相应的之间的关系。则停止执行并触发之前的所有。 本文首发于泊浮目的专栏:https://segmentfault.com/blog... 前言 在ZStack中,当用户在UI上发起操作时,前端会调用后端的API对实际的资源发起操作请求。但在一个分布式系统中...
摘要:但在实际的二次开发中,这些做法未必能够完全满足需求。在源码剖析之核心库鉴赏一文中,我们了解到是的基础设施之一,同时也允许通过显示声明的方式来声明。同理,一些也可以使用继承进行扩展。 本文首发于泊浮目的专栏:https://segmentfault.com/blog... 前言 在ZStack博文-5.通用插件系统中,官方提出了几个较为经典的扩展方式。但在实际的二次开发中,这些做法未必...
摘要:下面将开始分析它的源码。仅仅定义了一个最小应有的行为。更好的选择由于该库是为定制而生,故此有一些防御性判断,源码显得略为。 本文首发于泊浮目的专栏:https://segmentfault.com/blog... 前言 在ZStack(或者说产品化的IaaS软件)中的任务通常有很长的执行路径,错误可能发生在路径的任意一处。为了保证系统的正确性,需提供一种较为完善的回滚机制——在ZSt...
阅读 1440·2019-08-29 17:14
阅读 1648·2019-08-29 12:12
阅读 728·2019-08-29 11:33
阅读 3263·2019-08-28 18:27
阅读 1443·2019-08-26 10:19
阅读 907·2019-08-23 18:18
阅读 3526·2019-08-23 16:15
阅读 2540·2019-08-23 14:14