摘要:下面将开始分析它的源码。仅仅定义了一个最小应有的行为。更好的选择由于该库是为定制而生,故此有一些防御性判断,源码显得略为。
本文首发于泊浮目的专栏:https://segmentfault.com/blog...前言
在ZStack(或者说产品化的IaaS软件)中的任务通常有很长的执行路径,错误可能发生在路径的任意一处。为了保证系统的正确性,需提供一种较为完善的回滚机制——在ZStack中,通过一个工作流引擎,ZStack的每一个步骤都被包裹在独立的工作流中,可以在出错的时候回滚。此外,通过在配置文件中组装工作流的方式,关键的执行路径可以被配置,这使得架构的耦合度进一步降低。
系统解耦合的手段除了之前文章所提到的分层、分割、分布等,还有一个重要手段是异步,业务之间的消息传递不是同步调用,而是将一个业务操作分成多个阶段,每个阶段之间通过共享数据的方式异步执行进行协作。
这即是一种在业务设计原则中——流程可定义原则的具象化。接触过金融行业的同学肯定知道,不同的保险理赔流程是不一样的。而承保流程和理赔流程是分离的,在需要时进行关联,从而可以复用一些理赔流程,并提供一些个性化理赔流程。
演示代码就以创建VM为例,在ZStack中大致可以分以下几个步骤:
org.zstack.compute.vm.VmImageSelectBackupStorageFlow org.zstack.compute.vm.VmAllocateHostFlow org.zstack.compute.vm.VmAllocatePrimaryStorageFlow org.zstack.compute.vm.VmAllocateVolumeFlow org.zstack.compute.vm.VmAllocateNicFlow org.zstack.compute.vm.VmInstantiateResourcePreFlow org.zstack.compute.vm.VmCreateOnHypervisorFlow org.zstack.compute.vm.VmInstantiateResourcePostFlow
可以说是代码即文档了。在这里,ZStack显式声明这些Flow在Spring XML中,这些属性将会被注入到createVmWorkFlowElements中。每一个Flow都被拆成了一个个较小的单元,好处不仅是将业务操作分成了多个阶段易于回滚,还是可以有效复用这些Flow。这也是编程思想中“组合”的体现。
如何使用除了这种配置型声明,还可以在代码中灵活的使用这些FlowChain。在这里,我们将以Case来说明这些FlowChain的用法,避免对ZStack业务逻辑不熟悉的读者看的一头雾水。
一共有两种可用的FlowChain:
SimpleFlowChain
ShareFlowChain
SimpleFlowChain我们先来看一个Case。
@Test public void test() { FlowChain chain = FlowChainBuilder.newShareFlowChain(); chain.then(new ShareFlow() { int a; @Override public void setup() { flow(new NoRollbackFlow() { @Override public void run(FlowTrigger trigger, Map data) { a = 1; increase(); trigger.next(); } }); flow(new NoRollbackFlow() { @Override public void run(FlowTrigger trigger, Map data) { a = 2; increase(); trigger.next(); } }); } }).done(new FlowDoneHandler(null) { @Override public void handle(Map data) { success = true; } }).start(); Assert.assertTrue(success); expect(2); }
我们可以看到,这就是一个工作流。完成一个工作流的时候(回调触发时)执行下一个工作流——由trigger.next触发。不仅如此,还可以添加Rollback属性。
@Test public void test() throws WorkFlowException { final int[] count = {0}; new SimpleFlowChain() .then(new Flow() { @Override public void run(FlowTrigger chain, Map data) { count[0]++; chain.next(); } @Override public void rollback(FlowRollback chain, Map data) { count[0]--; chain.rollback(); } }) .then(new Flow() { @Override public void run(FlowTrigger chain, Map data) { count[0]++; chain.next(); } @Override public void rollback(FlowRollback chain, Map data) { count[0]--; chain.rollback(); } }) .then(new Flow() { @Override public void run(FlowTrigger chain, Map data) { chain.fail(null); } @Override public void rollback(FlowRollback chain, Map data) { count[0]--; chain.rollback(); } }) .start(); Assert.assertEquals(-1, count[0]); }
rollback由FlowTrigger的fail触发。这样我们可以保证在发生一些错误的时候及时回滚,防止我们的系统处于一个有脏数据的中间状态。同时,Map也可以用来在Flow之间传递上下文。
ShareFlowChainpublic class TestShareFlow { int[] count = {0}; boolean success; private void increase() { count[0]++; } private void decrease() { count[0]--; } private void expect(int ret) { Assert.assertEquals(count[0], ret); } @Test public void test() { FlowChain chain = FlowChainBuilder.newShareFlowChain(); chain.then(new ShareFlow() { int a; @Override public void setup() { flow(new NoRollbackFlow() { @Override public void run(FlowTrigger trigger, Map data) { a = 1; increase(); trigger.next(); } }); flow(new NoRollbackFlow() { @Override public void run(FlowTrigger trigger, Map data) { a = 2; increase(); trigger.next(); } }); } }).done(new FlowDoneHandler(null) { @Override public void handle(Map data) { success = true; } }).start(); Assert.assertTrue(success); expect(2); } @Before public void setUp() throws Exception { new BeanConstructor().build(); } }
比起SimpleFlowChain,ShareFlowChain则是一个Inner class,在相同的作用域里,传递数据变得更加的方便了。
它的实现在ZStack中,FlowChain作为核心库,其实现也是非常的简单(可以直接参考SimpleFlowChain和ShareFlowChain),本质就是将任务放入List中,由内部方法进行迭代,在此基础上做了一系列操作。下面将开始分析它的源码。
从接口说起public interface FlowChain { ListgetFlows(); FlowChain insert(Flow flow); FlowChain insert(int pos, Flow flow); FlowChain setFlowMarshaller(FlowMarshaller marshaller); FlowChain then(Flow flow); FlowChain done(FlowDoneHandler handler); FlowChain error(FlowErrorHandler handler); FlowChain Finally(FlowFinallyHandler handler); FlowChain setData(Map data); FlowChain putData(Map.Entry... es); FlowChain setName(String name); void setProcessors(List processors); Map getData(); void start(); FlowChain noRollback(boolean no); FlowChain allowEmptyFlow(); }
接口的名字非常的易懂,那么在这里就不多作解释了。FlowChain仅仅定义了一个Flow最小应有的行为。
//定义了Flow的回滚操作接口 public interface FlowRollback extends AsyncBackup { //回滚操作 void rollback(); //设置跳过回滚操作 void skipRestRollbacks(); }
//定义了触发器的行为接口 public interface FlowTrigger extends AsyncBackup { //触发失败,调用errorHandle void fail(ErrorCode errorCode); //触发下一个flow void next(); //setError后,在下次调用next的时才会调用errorHandle void setError(ErrorCode error); }源码解析 Flow
public interface Flow { void run(FlowTrigger trigger, Map data); void rollback(FlowRollback trigger, Map data); }
Flow的定义其实非常的简单——一组方法。执行和对应的回滚,一般在ZStack中都以匿名内部类的方式传入。
Chain的用法在之前的SimpleFlowChain的case中。我们可以看到一系列的链式调用,大致如下:
new SimpleFlowChain().then(new flow()).then(new flow()).then(new flow()).start();
then本质是往List
public SimpleFlowChain then(Flow flow) { flows.add(flow); return this; }
再来看看start
@Override public void start() { // 检测flow中是否设置了processors。一般用来打trace if (processors != null) { for (FlowChainProcessor p : processors) { p.processFlowChain(this); } } //如果flows为空但是之前在设置中允许为空,那么就直接直接done部分的逻辑。不然就报错 if (flows.isEmpty() && allowEmptyFlow) { callDoneHandler(); return; } if (flows.isEmpty()) { throw new CloudRuntimeException("you must call then() to add flow before calling start() or allowEmptyFlow() to run empty flow chain on purpose"); } //每个flow必须有一个map,用来传递上下文 if (data == null) { data = new HashMap(); } //标记为已经开始 isStart = true; //如果没有名字的话给flow 取一个名字,因为很有可能是匿名使用的flow if (name == null) { name = "anonymous-chain"; } logger.debug(String.format("[FlowChain(%s): %s] starts", id, name)); //打印trace,方便调试 if (logger.isTraceEnabled()) { List names = CollectionUtils.transformToList(flows, new Function () { @Override public String call(Flow arg) { return String.format("%s[%s]", arg.getClass(), getFlowName(arg)); } }); logger.trace(String.format("execution path: %s", StringUtils.join(names, " --> "))); } //生成一个迭代器 it = flows.iterator(); //从it中获取一个不需要跳过的flow开始执行。如果没有获取到,就执行done逻辑 Flow flow = getFirstNotSkippedFlow(); if (flow == null) { // all flows are skipped callDoneHandler(); } else { runFlow(flow); } }
再来看一下runFlow中的代码
private void runFlow(Flow flow) { try { //看报错信息就可以猜到在做什么防御措施了:如果一个transaction在一个flow中没有被关闭而跳到下一个flow时,会抛出异常。这个防御机制来自于一个实习生写的bug,当时被排查出来的时候花了非常大的力气——现象非常的诡异。所以现在被写在了这里。 if (TransactionSynchronizationManager.isActualTransactionActive()) { String flowName = null; String flowClassName = null; if (currentFlow != null) { flowName = getFlowName(currentFlow); flowClassName = currentFlow.getClass().getName(); } throw new CloudRuntimeException(String.format("flow[%s:%s] opened a transaction but forgot closing it", flowClassName, flowName)); } //toRun就是一个当前要run的flow Flow toRun = null; if (flowMarshaller != null) { //flowMarshaller 实际上是一个非常恶心的玩意儿。尤其在一些配置好掉的xml flow突然因为一些条件而改变接下来执行的flow令人很无语...但是也提供了一些灵活性。 toRun = flowMarshaller.marshalTheNextFlow(currentFlow == null ? null : currentFlow.getClass().getName(), flow.getClass().getName(), this, data); if (toRun != null) { logger.debug(String.format("[FlowChain(%s): %s] FlowMarshaller[%s] replaces the next flow[%s] to the flow[%s]", id, name, flowMarshaller.getClass(), flow.getClass(), toRun.getClass())); } } if (toRun == null) { toRun = flow; } if (CoreGlobalProperty.PROFILER_WORKFLOW) { //对flow的监视。比如flow的执行时间等 stopWatch.start(toRun); } currentFlow = toRun; String flowName = getFlowName(currentFlow); String info = String.format("[FlowChain(%s): %s] start executing flow[%s]", id, name, flowName); logger.debug(info); //在flow中还允许定义afterDone afterError afterFinal的行为。稍后将会介绍 collectAfterRunnable(toRun); //终于到了run,这里就是调用者传入的行为来决定run中的逻辑 toRun.run(this, data); //fail的逻辑稍后解析 } catch (OperationFailureException oe) { String errInfo = oe.getErrorCode() != null ? oe.getErrorCode().toString() : ""; logger.warn(errInfo, oe); fail(oe.getErrorCode()); } catch (FlowException fe) { String errInfo = fe.getErrorCode() != null ? fe.getErrorCode().toString() : ""; logger.warn(errInfo, fe); fail(fe.getErrorCode()); } catch (Throwable t) { logger.warn(String.format("[FlowChain(%s): %s] unhandled exception when executing flow[%s], start to rollback", id, name, flow.getClass().getName()), t); fail(errf.throwableToInternalError(t)); } }
fail
@Override public void fail(ErrorCode errorCode) { isFailCalled = true; setErrorCode(errorCode); //放入Stack中,之后Rollback会根据Stack中的flow顺序来 rollBackFlows.push(currentFlow); //rollback会对this.rollBackFlows中flow按照顺序调用rollback rollback(); }FlowTrigger
//定义了触发器的行为接口 public interface FlowTrigger extends AsyncBackup { //触发失败,调用errorHandle void fail(ErrorCode errorCode); //触发下一个flow void next(); //setError后,在下次调用next的时才会调用errorHandle void setError(ErrorCode error); }
之前已经看过fail的代码。接下来来看看next和setError。
@Override public void next() { //如果flow没有run起来的情况下,是不能调用next的 if (!isStart) { throw new CloudRuntimeException( String.format("[FlowChain(%s): %s] you must call start() first, and only call next() in Flow.run()", id, name)); } //当rollback开始的时候也不允许next if (isRollbackStart) { throw new CloudRuntimeException( String.format("[FlowChain(%s): %s] rollback has started, you can"t call next()", id, name)); } //将当前flow的push进rollback用的stack rollBackFlows.push(currentFlow); logger.debug(String.format("[FlowChain(%s): %s] successfully executed flow[%s]", id, name, getFlowName(currentFlow))); //获取下一个flow。在这里才是真正意义上的next Flow flow = getFirstNotSkippedFlow(); if (flow == null) { // no flows, or all flows are skipped if (errorCode == null) { callDoneHandler(); } else { callErrorHandler(false); } } else { runFlow(flow); } }
可以看一下getFirstNotSkippedFlow,本质上是利用了迭代器的特性。
private Flow getFirstNotSkippedFlow() { Flow flow = null; while (it.hasNext()) { flow = it.next(); if (!isSkipFlow(flow)) { break; } } return flow; }
接下来是setError
@Override public void setError(ErrorCode error) { setErrorCode(error); } //往下看 private void setErrorCode(ErrorCode errorCode) { this.errorCode = errorCode; }
根据之前的next逻辑:
if (flow == null) { // no flows, or all flows are skipped if (errorCode == null) { callDoneHandler(); } else { callErrorHandler(false); } } else { runFlow(flow); }
我们可以大致猜想到,如果在next的时候当前error不为空,则调用错误handle。这样在setError后还可以做一些事情。
无论是调用errorHandle还是doneHandle,都会调用finalHandle。finalHandle也允许用户定义这部分的逻辑,使flow更加的灵活。
更好的选择由于该库是为ZStack定制而生,故此有一些防御性判断,源码显得略为verbose。如果有同学对此感兴趣,想将其应用到自己的系统中,笔者推荐使用:jdeferred。
Java Deferred/Promise library similar to JQuery
由于JavaScript 中的代码都是异步调用的。简单说,它的思想是,每一个异步任务返回一个Promise对象,该对象有一个then方法,允许指定回调函数。
在这里列出几个较为简单的示范,或者有兴趣的读者也可以参考这里:
import org.jdeferred.DeferredManager; import org.jdeferred.Promise; import org.jdeferred.impl.DefaultDeferredManager; import org.junit.After; import org.junit.Assert; import org.junit.Test; import java.util.concurrent.TimeUnit; public class deferSimpleTest { private static int var = 0; final DeferredManager dm = new DefaultDeferredManager(); @After public void cleanUp() { var = 0; } @Test public void test() { Promise p1 = dm.when(() -> { var += 1; }).then(result -> { var += 1; }); Promise p2 = dm.when(() -> { var += 1; }).then(result -> { var += 1; }); dm.when(p1, p2).done(Void -> var += 1); Assert.assertEquals(5, var); } @Test public void test2() { final DeferredManager dm = new DefaultDeferredManager(); Promise promise = dm.when(() -> { var += 1; }).then(result -> { var += 1; }); dm.when(promise).done(Void -> var += 1); Assert.assertEquals(3, var); } @Test public void testBadCallback() { Promise promise = dm.when(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); dm.when(promise).done(Void -> { var += 1; throw new RuntimeException("this exception is expected"); } ).fail(Void -> { System.out.print("fail!"); var -= 1; }); Assert.assertEquals(0, var); } }
如果你在使用Java8,那么也可以通过CompletableFuture来得到“类似”的支持。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/70845.html
摘要:因为这个状态下,是交给一个线程在执行的,见源码剖析之核心库鉴赏中的分析。并且允许等行为。上面提到过,允许运行暂停取消等行为。维护和相应的之间的关系。则停止执行并触发之前的所有。 本文首发于泊浮目的专栏:https://segmentfault.com/blog... 前言 在ZStack中,当用户在UI上发起操作时,前端会调用后端的API对实际的资源发起操作请求。但在一个分布式系统中...
摘要:但在实际的二次开发中,这些做法未必能够完全满足需求。在源码剖析之核心库鉴赏一文中,我们了解到是的基础设施之一,同时也允许通过显示声明的方式来声明。同理,一些也可以使用继承进行扩展。 本文首发于泊浮目的专栏:https://segmentfault.com/blog... 前言 在ZStack博文-5.通用插件系统中,官方提出了几个较为经典的扩展方式。但在实际的二次开发中,这些做法未必...
摘要:本文首发于泊浮目的专栏在语言中,有一个关键字叫做其作用是在函数前执行。一般有两种用法在该函数抛出异常时执行。在该函数返回前执行。这里的放入来自系统启动时利用反射所做的一个行为。因此并不会影响使用时的性能。 本文首发于泊浮目的专栏:https://segmentfault.com/blog... 在Go语言中,有一个关键字叫做defer——其作用是在函数return前执行。在ZStac...
摘要:本文首发于泊浮目的专栏在语言中,有一个关键字叫做其作用是在函数前执行。一般有两种用法在该函数抛出异常时执行。在该函数返回前执行。这里的放入来自系统启动时利用反射所做的一个行为。因此并不会影响使用时的性能。 本文首发于泊浮目的专栏:https://segmentfault.com/blog... 在Go语言中,有一个关键字叫做defer——其作用是在函数return前执行。在ZStac...
摘要:每个消息都会被一个线程消费,同时最大并发量为。然后提交一个任务到线程池中,这个任务的内容是从等待队列中取出一个,如果等待队列为空,则删除这个等待队列的。小结本文分析了的久经生产考验的核心组件线程池。 本文首发于泊浮目的专栏:https://segmentfault.com/blog... 前言 在ZStack中,最基本的执行单位不仅仅是一个函数,也可以是一个任务(Task。其本质实现...
阅读 1949·2023-04-26 01:56
阅读 3120·2021-11-18 10:02
阅读 3069·2021-09-09 11:35
阅读 1305·2021-09-03 10:28
阅读 3428·2019-08-29 18:36
阅读 2858·2019-08-29 17:14
阅读 840·2019-08-29 16:10
阅读 1623·2019-08-26 13:45