摘要:转载请注明出处翻译下面的代码展示了版的查看源码的同等实现如下可以通过调用方法实现同步执行示例如下测试如下不提供同步执行方法但是如果确定其只会产生一个值那么也可以用如下方式实现如果实际上产生了多个值上述的代码将会抛出可以通过调用方法实现异步
转载请注明出处: 翻译:Hystrix - How To Use
Hello World!下面的代码展示了HystrixCommand版的Hello World:
public class CommandHelloWorld extends HystrixCommand{ private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { // a real example would do work like a network call here return "Hello " + name + "!"; } }
查看源码
HystrixObservableCommand的同等实现如下:
public class CommandHelloWorld extends HystrixObservableCommandSynchronous Execution{ private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected Observable construct() { return Observable.create(new Observable.OnSubscribe () { @Override public void call(Subscriber super String> observer) { try { if (!observer.isUnsubscribed()) { // a real example would do work like a network call here observer.onNext("Hello"); observer.onNext(name + "!"); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); } }
可以通过调用HystrixCommand.execute()方法实现同步执行, 示例如下:
String s = new CommandHelloWorld("World").execute();
测试如下:
@Test public void testSynchronous() { assertEquals("Hello World!", new CommandHelloWorld("World").execute()); assertEquals("Hello Bob!", new CommandHelloWorld("Bob").execute()); }
HystrixObservableCommand不提供同步执行方法, 但是如果确定其只会产生一个值, 那么也可以用如下方式实现:
HystrixObservableCommand.observe().observe().toBlocking().toFuture().get()
HystrixObservableCommand.toObservable().observe().toBlocking().toFuture().get()
如果实际上产生了多个值, 上述的代码将会抛出java.lang.IllegalArgumentException: Sequence contains too many elements.
Asynchronous Execution可以通过调用HystrixCommand.queue()方法实现异步执行, 示例如下:
Futurefs = new CommandHelloWorld("World").queue();
此时可以通过Future.get()方法获取command执行结果:
String s = fs.get();
测试代码如下:
@Test public void testAsynchronous1() throws Exception { assertEquals("Hello World!", new CommandHelloWorld("World").queue().get()); assertEquals("Hello Bob!", new CommandHelloWorld("Bob").queue().get()); } @Test public void testAsynchronous2() throws Exception { FuturefWorld = new CommandHelloWorld("World").queue(); Future fBob = new CommandHelloWorld("Bob").queue(); assertEquals("Hello World!", fWorld.get()); assertEquals("Hello Bob!", fBob.get()); }
下面的两种实现是等价的:
String s1 = new CommandHelloWorld("World").execute(); String s2 = new CommandHelloWorld("World").queue().get();
HystrixObservableCommand不提供queue方法, 但是如果确定其只会产生一个值, 那么也可以用如下方式实现:
HystrixObservableCommand.observe().observe().toBlocking().toFuture()
HystrixObservableCommand.toObservable().observe().toBlocking().toFuture()
如果实际上产生了多个值, 上述的代码将会抛出java.lang.IllegalArgumentException: Sequence contains too many elements.
Reactive Execution你也可以将HystrixCommand当做一个可观察对象(Observable)来观察(Observe)其产生的结果, 可以使用以下任意一个方法实现:
observe(): 一旦调用该方法, 请求将立即开始执行, 其利用ReplaySubject特性可以保证不会丢失任何command产生的结果, 即使结果在你订阅之前产生的也不会丢失.
toObservable(): 调用该方法后不会立即执行请求, 而是当有订阅者订阅时才会执行.
Observableho = new CommandHelloWorld("World").observe(); // or Observable co = new CommandHelloWorld("World").toObservable();
然后你可以通过订阅到这个Observable来取得command产生的结果:
ho.subscribe(new Action1() { @Override public void call(String s) { // value emitted here } });
测试如下:
@Test public void testObservable() throws Exception { ObservablefWorld = new CommandHelloWorld("World").observe(); Observable fBob = new CommandHelloWorld("Bob").observe(); // blocking assertEquals("Hello World!", fWorld.toBlockingObservable().single()); assertEquals("Hello Bob!", fBob.toBlockingObservable().single()); // non-blocking // - this is a verbose anonymous inner-class approach and doesn"t do assertions fWorld.subscribe(new Observer () { @Override public void onCompleted() { // nothing needed here } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String v) { System.out.println("onNext: " + v); } }); // non-blocking // - also verbose anonymous inner-class // - ignore errors and onCompleted signal fBob.subscribe(new Action1 () { @Override public void call(String v) { System.out.println("onNext: " + v); } }); }
使用Java 8的Lambda表达式可以使代码更简洁:
fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }) // - or while also including error handling fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }, (exception) -> { exception.printStackTrace(); })
关于Observable的信息可以在这里查阅
Reactive Commands相比将HystrixCommand使用上述方法转换成一个Observable, 你也可以选择创建一个HystrixObservableCommand对象. HystrixObservableCommand包装的Observable允许产生多个结果(译者注: Subscriber.onNext可以调用多次), 而HystrixCommand即使转换成了Observable也只能产生一个结果.
使用HystrixObservableCommnad时, 你需要重载construct方法来实现你的业务逻辑, 而不是重载run方法, contruct方法将会返回你需要包装的Observable.
使用下面任意一个方法可以从HystrixObservableCommand中获取Observable对象:
observe(): 一旦调用该方法, 请求将立即开始执行, 其利用ReplaySubject特性可以保证不会丢失任何command产生的结果, 即使结果在你订阅之前产生的也不会丢失.
toObservable(): 调用该方法后不会立即执行请求, 而是当有订阅者订阅时才会执行.
Fallback大多数情况下, 我们都希望command在执行失败时能够有一个候选方法来处理, 如: 返回一个默认值或执行其他失败处理逻辑, 除了以下几个情况:
执行写操作的command: 当command的目标是执行写操作而不是读操作, 那么通常需要将写操作失败的错误交给调用者处理.
批处理系统/离线计算: 如果command的目标是做一些离线计算、生成报表、填充缓存等, 那么同样应该将失败交给调用者处理.
无论command是否实现了getFallback()方法, command执行失败时, Hystrix的状态和断路器(circuit-breaker)的状态/指标都会进行更新.
HystrixCommand可以通过实现getFallback()方法来实现降级处理, run()方法异常、执行超时、线程池或信号量已满拒绝提供服务、断路器短路时, 都会调用getFallback():
public class CommandHelloFailure extends HystrixCommand{ private final String name; public CommandHelloFailure(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { throw new RuntimeException("this command always fails"); } @Override protected String getFallback() { return "Hello Failure " + name + "!"; } }
查看源码
这个命令的run()方法总是会执行失败, 但是调用者总是能收到getFallback()方法返回的值, 而不是收到一个异常:
@Test public void testSynchronous() { assertEquals("Hello Failure World!", new CommandHelloFailure("World").execute()); assertEquals("Hello Failure Bob!", new CommandHelloFailure("Bob").execute()); }
HystrixObservableCommand可以通过重载resumeWithFallback方法实现原Observable执行失败时返回回另一个Observable, 需要注意的是, 原Observable有可能在发出多个结果之后才出现错误, 因此在fallback实现的逻辑中不应该假设订阅者只会收到失败逻辑中发出的结果.
Hystrix内部使用了RxJava的onErrorResumeNext操作符来实现Observable之间的无缝转移.
Error Propagation除HystrixBadRequestException异常外, run方法中抛出的所有异常都会被认为是执行失败且会触发getFallback()方法和断路器的逻辑.
你可以在HystrixBadRequestException中包装想要抛出的异常, 然后通过getCause()方法获取. HystrixBadRequestException使用在不应该被错误指标(failure metrics)统计和不应该触发getFallback()方法的场景, 例如报告参数不合法或者非系统异常等.
对于HystrixObservableCommand, 不可恢复的错误都会在通过onError方法通知, 并通过获取用户实现的resumeWithFallback()方法返回的Observable来完成回退机制.
执行异常类型Failure Type | Exception class | Exception.cause |
---|---|---|
FAILURE | HystrixRuntimeException | underlying exception(user-controlled) |
TIMEOUT | HystrixRuntimeException | j.u.c.TimeoutException |
SHORT_CIRCUITED | HystrixRuntimeException | j.l.RuntimeException |
THREAD_POOL_REJECTED | HystrixRuntimeException | j.u.c.RejectedExecutionException |
SEMAPHORE_REJECTED | HystrixRuntimeException | j.l.RuntimeException |
BAD_REQUEST | HystrixBadRequestException | underlying exception(user-controller) |
默认的command name是从类名中派生的:
getClass().getSimpleName()
可以通过HystrixCommand或HystrixObservableCommand的构造器来指定command name:
public CommandHelloWorld(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))); this.name = name; }
可以通过如下方式来重用Setter:
private static final Setter cachedSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")); public CommandHelloWorld(String name) { super(cachedSetter); this.name = name; }
HystrixCommandKey是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个Factory类来构建帮助构建内部实例, 使用方式如下:
HystrixCommandKey.Factory.asKey("Hello World");Command Group
Hystrix使用command group来为分组, 分组信息主要用于报告、警报、仪表盘上显示, 或者是标识团队/库的拥有者.
默认情况下, 除非已经用这个名字定义了一个信号量, 否则 Hystrix将使用这个名称来定义command的线程池.
HystrixCommandGroupKey是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个Factory类来构建帮助构建内部实例, 使用方式如下:
HystrixCommandGroupKey.Factory.asKey("Example Group")Command Thread-pool
thread-pool key主要用于在监控、指标发布、缓存等类似场景中标识一个HystrixThreadPool, 一个HystrixCommand于其构造函数中传入的HystrixThreadPoolKey指定的HystrixThreadPool相关联, 如果未指定的话, 则使用HystrixCommandGroupKey来获取/创建HystrixThreadPool.
可以通过HystrixCommand或HystrixObservableCommand的构造器来指定其值:
public CommandHelloWorld(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool"))); this.name = name; }
HystrixCommandThreadPoolKey是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个Factory类来构建帮助构建内部实例, 使用方式如下:
HystrixThreadPoolKey.Factory.asKey("Hello World Pool")
使用HystrixThreadPoolKey而不是使用不同的HystrixCommandGroupKey的原因是: 可能会有多条command在逻辑功能上属于同一个组(group), 但是其中的某些command需要和其他command隔离开, 例如:
两条用于访问视频元数据的command
两条command的group name都是VideoMetadata
command A与资源#1互斥
command B与资源#2互斥
如果command A由于延迟等原因导致其所在的线程池资源耗尽, 不应该影响command B对#2的执行, 因为他们访问的是不同的后端资源.
因此, 从逻辑上来说, 我们希望这两条command应该被分到同一个分组, 但是我们同样系统将这两条命令的执行隔离开来, 因此我们使用HystrixThreadPoolKey将其分配到不同的线程池.
Request Cache可以通过实现HystrixCommand或HystrixObservableCommand的getCacheKey()方法开启用对请求的缓存功能:
public class CommandUsingRequestCache extends HystrixCommand{ private final int value; protected CommandUsingRequestCache(int value) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.value = value; } @Override protected Boolean run() { return value == 0 || value % 2 == 0; } @Override protected String getCacheKey() { return String.valueOf(value); } }
由于该功能依赖于请求的上下文信息, 因此我们必须初始化一个HystrixRequestContext, 使用方式如下:
@Test public void testWithoutCacheHits() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertTrue(new CommandUsingRequestCache(2).execute()); assertFalse(new CommandUsingRequestCache(1).execute()); assertTrue(new CommandUsingRequestCache(0).execute()); assertTrue(new CommandUsingRequestCache(58672).execute()); } finally { context.shutdown(); } }
通常情况下, 上下文信息(HystrixRequestContext)应该在持有用户请求的ServletFilter或者其他拥有生命周期管理功能的类来初始化和关闭.
下面的例子展示了command如何从缓存中获取数据, 以及如何查询一个数据是否是从缓存中获取到的:
@Test public void testWithCacheHits() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command2a = new CommandUsingRequestCache(2); CommandUsingRequestCache command2b = new CommandUsingRequestCache(2); assertTrue(command2a.execute()); // this is the first time we"ve executed this command with // the value of "2" so it should not be from cache assertFalse(command2a.isResponseFromCache()); assertTrue(command2b.execute()); // this is the second time we"ve executed this command with // the same value so it should return from cache assertTrue(command2b.isResponseFromCache()); } finally { context.shutdown(); } // start a new request context context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command3b = new CommandUsingRequestCache(2); assertTrue(command3b.execute()); // this is a new request context so this // should not come from cache assertFalse(command3b.isResponseFromCache()); } finally { context.shutdown(); } }Request Collapsing
请求合并可以用于将多条请求绑定到一起, 由同一个HystrixCommand实例执行.
collapser可以通过batch size和batch创建以来的耗时来自动将请求合并执行.
Hystrix支持两个请求合并方式: 请求级的合并和全局级的合并. 默认是请求范围的合并, 可以在构造collapser时指定值.
请求级(request-scoped)的collapser只会合并每一个HystrixRequestContext中的请求, 而全局级(globally-scoped)的collapser则可以跨HystrixRequestContext合并请求. 因此, 如果你下游的依赖者无法再一个command中处理多个HystrixRequestContext的话, 那么你应该使用请求级的合并.
在Netflix, 我们只会使用请求级的合并, 因为我们当前所有的系统都是基于一个command对应一个HystrixRequestContext的设想下构建的. 因此, 当一个command使用不同的参数在一个请求中并发执行时, 合并是有效的.
下面的代码展示了如何实现请求级的HystrixCollapser:
public class CommandCollapserGetValueForKey extends HystrixCollapser, String, Integer> { private final Integer key; public CommandCollapserGetValueForKey(Integer key) { this.key = key; } @Override public Integer getRequestArgument() { return key; } @Override protected HystrixCommand
> createCommand(final Collection
> requests) { return new BatchCommand(requests); } @Override protected void mapResponseToRequests(List batchResponse, Collection > requests) { int count = 0; for (CollapsedRequest request : requests) { request.setResponse(batchResponse.get(count++)); } } private static final class BatchCommand extends HystrixCommand > { private final Collection
> requests; private BatchCommand(Collection > requests) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey"))); this.requests = requests; } @Override protected List run() { ArrayList response = new ArrayList (); for (CollapsedRequest request : requests) { // artificial response for each argument received in the batch response.add("ValueForKey: " + request.getArgument()); } return response; } } }
下面的代码展示了如果使用collapser自动合并4个CommandCollapserGetValueForKey到一个HystrixCommand中执行:
@Test public void testCollapser() throws Exception { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { FutureRequest Context Setupf1 = new CommandCollapserGetValueForKey(1).queue(); Future f2 = new CommandCollapserGetValueForKey(2).queue(); Future f3 = new CommandCollapserGetValueForKey(3).queue(); Future f4 = new CommandCollapserGetValueForKey(4).queue(); assertEquals("ValueForKey: 1", f1.get()); assertEquals("ValueForKey: 2", f2.get()); assertEquals("ValueForKey: 3", f3.get()); assertEquals("ValueForKey: 4", f4.get()); // assert that the batch command "GetValueForKey" was in fact // executed and that it executed only once assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); HystrixCommand> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand>[1])[0]; // assert the command is the one we"re expecting assertEquals("GetValueForKey", command.getCommandKey().name()); // confirm that it was a COLLAPSED command execution assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); // and that it was successful assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } finally { context.shutdown(); } }
使用请求级的特性时(如: 请求缓存、请求合并、请求日志)你必须管理HystrixRequestContext的生命周期(或者实现HystrixConcurrencyStategy).
这意味着你必须在请求之前执行如下代码:
HystrixRequestContext context = HystrixRequestContext.initializeContext();
并在请求结束后执行如下代码:
context.shutdown();
在标准的Java web应用中, 你可以使用Setvlet Filter实现的如下的过滤器来管理:
public class HystrixRequestContextServletFilter implements Filter { public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(request, response); } finally { context.shutdown(); } } }
可以在web.xml中加入如下代码实现对所有的请求都使用该过滤器:
Common PatternsHystrixRequestContextServletFilter HystrixRequestContextServletFilter com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter HystrixRequestContextServletFilter /*
以下是HystrixCommand和HystrixObservableCommand的一般用法和使用模式.
Fail Fast最基本的使用是执行一条只做一件事情且没有实现回退方法的command, 这样的command在发生任何错误时都会抛出异常:
public class CommandThatFailsFast extends HystrixCommand{ private final boolean throwException; public CommandThatFailsFast(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } }
下面的代码演示了上述行为:
@Test public void testSuccess() { assertEquals("success", new CommandThatFailsFast(false).execute()); } @Test public void testFailure() { try { new CommandThatFailsFast(true).execute(); fail("we should have thrown an exception"); } catch (HystrixRuntimeException e) { assertEquals("failure from CommandThatFailsFast", e.getCause().getMessage()); e.printStackTrace(); } }
HystrixObservableCommand需要重载resumeWithFallback()方法来实现同样的行为:
@Override protected ObservableFail SilentresumeWithFallback() { if (throwException) { return Observable.error(new Throwable("failure from CommandThatFailsFast")); } else { return Observable.just("success"); } }
静默失败等同于返回一个空的响应或者移除功能. 可以是返回null、空Map、空List, 或者其他类似的响应.
可以通过实现HystrixCommand.getFallback()方法实现该功能:
public class CommandThatFailsSilently extends HystrixCommand{ private final boolean throwException; public CommandThatFailsSilently(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } } @Override protected String getFallback() { return null; } }
@Test public void testSuccess() { assertEquals("success", new CommandThatFailsSilently(false).execute()); } @Test public void testFailure() { try { assertEquals(null, new CommandThatFailsSilently(true).execute()); } catch (HystrixRuntimeException e) { fail("we should not get an exception as we fail silently with a fallback"); } }
或者返回一个空List的实现如下:
@Override protected ListgetFallback() { return Collections.emptyList(); }
HystrixObservableCommand可以通过重载resumeWithFallback()方法实现同样的行为:
@Override protected ObservableFallback: StaticresumeWithFallback() { return Observable.empty(); }
Fallback可以返回代码里设定的默认值, 这种方式可以通过默认行为来有效避免于静默失败带来影响.
例如, 如果一个应返回true/false的用户认证的command执行失败了, 那么其默认行为可以如下:
@Override protected Boolean getFallback() { return true; }
对于HystrixObservableCommand可以通过重载resumeWithFallback()方法实现同样的行为:
@Override protected ObservableFallback: StubbedresumeWithFallback() { return Observable.just( true ); }
当command返回的是一个包含多个字段的复合对象, 且该对象的一部分字段值可以通过其他请求状态获得, 另一部分状态可以通过设置默认值获得时, 你通常需要使用存根(stubbed)模式.
你可能可以从存根值(stubbed values)中得到适当的值的情况如下:
cookies
请求参数和请求头
当前失败请求的前一个服务请求的响应
在fallback代码块内可以静态地获取请求范围内的存根(stubbed)值, 但是通常我们更推荐在构建command实例时注入这些值, 就像下面实例的代码中的countryCodeFromGeoLookup一样:
public class CommandWithStubbedFallback extends HystrixCommand{ private final int customerId; private final String countryCodeFromGeoLookup; /** * @param customerId * The customerID to retrieve UserAccount for * @param countryCodeFromGeoLookup * The default country code from the HTTP request geo code lookup used for fallback. */ protected CommandWithStubbedFallback(int customerId, String countryCodeFromGeoLookup) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.customerId = customerId; this.countryCodeFromGeoLookup = countryCodeFromGeoLookup; } @Override protected UserAccount run() { // fetch UserAccount from remote service // return UserAccountClient.getAccount(customerId); throw new RuntimeException("forcing failure for example"); } @Override protected UserAccount getFallback() { /** * Return stubbed fallback with some static defaults, placeholders, * and an injected value "countryCodeFromGeoLookup" that we"ll use * instead of what we would have retrieved from the remote service. */ return new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false); } public static class UserAccount { private final int customerId; private final String name; private final String countryCode; private final boolean isFeatureXPermitted; private final boolean isFeatureYPermitted; private final boolean isFeatureZPermitted; UserAccount(int customerId, String name, String countryCode, boolean isFeatureXPermitted, boolean isFeatureYPermitted, boolean isFeatureZPermitted) { this.customerId = customerId; this.name = name; this.countryCode = countryCode; this.isFeatureXPermitted = isFeatureXPermitted; this.isFeatureYPermitted = isFeatureYPermitted; this.isFeatureZPermitted = isFeatureZPermitted; } } }
下面的代码演示了上述行为:
@Test public void test() { CommandWithStubbedFallback command = new CommandWithStubbedFallback(1234, "ca"); UserAccount account = command.execute(); assertTrue(command.isFailedExecution()); assertTrue(command.isResponseFromFallback()); assertEquals(1234, account.customerId); assertEquals("ca", account.countryCode); assertEquals(true, account.isFeatureXPermitted); assertEquals(true, account.isFeatureYPermitted); assertEquals(false, account.isFeatureZPermitted); }
对于HystrixObservableCommand可以通过重载resumeWithFallback()方法实现同样的行为:
@Override protected ObservableresumeWithFallback() { return Observable.just( new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false) ); }
如果你想要从Observable中发出多个值, 那么当失败发生时, 原本的Observable可能已经发出的一部分值, 此时你或许更希望能够只从fallback逻辑中发出另一部分未被发出的值, 下面的例子就展示了如何实现这一个目的: 它通过追踪原Observable发出的最后一个值来实现fallback逻辑中的Observable应该从什么地方继续发出存根值(stubbed value) :
@Override protected ObservableFallback: Cache via Networkconstruct() { return Observable.just(1, 2, 3) .concatWith(Observable. error(new RuntimeException("forced error"))) .doOnNext(new Action1 () { @Override public void call(Integer t1) { lastSeen = t1; } }) .subscribeOn(Schedulers.computation()); } @Override protected Observable resumeWithFallback() { if (lastSeen < 4) { return Observable.range(lastSeen + 1, 4 - lastSeen); } else { return Observable.empty(); } }
有时后端的服务异常也会引起command执行失败, 此时我们也可以从缓存中(如: memcached)取得相关的数据.
由于在fallback的逻辑代码中访问网络可能会再次失败, 因此必须构建新的HystrixCommand或HystrixObservableCommand来执行:
很重要的一点是执行fallback逻辑的command需要在一个不同的线程池中执行, 否则如果原command的延迟变高且其所在线程池已经满了的话, 执行fallback逻辑的command将无法在同一个线程池中执行.
下面的代码展示了CommandWithFallbackViaNetwork如何在getFallback()方法中执行FallbackViaNetwork.
注意, FallbackViaNetwork同样也具有回退机制, 这里通过返回null来实现fail silent.
FallbackViaNetwork默认会从HystrixCommandGroupKey中继承线程池的配置RemoteServiceX, 因此需要在其构造器中注入HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")来使其在不同的线程池中执行.
这样, CommandWithFallbackViaNetwork会在名为RemoteServiceX的线程池中执行, 而FallbackViaNetwork会在名为RemoteServiceXFallback的线程池中执行.
public class CommandWithFallbackViaNetwork extends HystrixCommandPrimary + Secondary with Fallback{ private final int id; protected CommandWithFallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand"))); this.id = id; } @Override protected String run() { // RemoteServiceXClient.getValue(id); throw new RuntimeException("force failure for example"); } @Override protected String getFallback() { return new FallbackViaNetwork(id).execute(); } private static class FallbackViaNetwork extends HystrixCommand { private final int id; public FallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand")) // use a different threadpool for the fallback command // so saturating the RemoteServiceX pool won"t prevent // fallbacks from executing .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback"))); this.id = id; } @Override protected String run() { MemCacheClient.getValue(id); } @Override protected String getFallback() { // the fallback also failed // so this fallback-of-a-fallback will // fail silently and return null return null; } } }
有些系统可能具有是以双系统模式搭建的 — 主从模式或主备模式.
有时从系统或备用系统会被认为是失败状态的一种, 仅在执行fallback逻辑是才使用它;这种场景和Cache via Network一节中描述的场景是一样的.
然而, 如果切换到从系统是一个很正常时, 例如发布新代码时(这是有状态的系统发布代码的一种方式), 此时每当切换到从系统使用时, 主系统都是处于不可用状态,断路器将会打开且发出警报.
这并不是我们期望发生的事, 这种狼来了式的警报可能会导致真正发生问题的时候我们却把它当成正常的误报而忽略了.
因此, 我们可以通过在其前面放置一个门面HystrixCommand(见下文), 将主/从系统的切换视为正常的、健康的状态.
主从HystrixCommand都是需要访问网络且实现了特定的业务逻辑, 因此其实现上应该是线程隔离的. 它们可能具有显著的性能差距(通常从系统是一个静态缓存), 因此将两个command隔离的另一个好处是可以针对性地调优.
你不需要将这两个command都公开发布, 只需要将它们隐藏在另一个由信号量隔离的HystrixCommand中(称之为门面HystrixCommand), 在这个command中去实现主系统还是从系统的调用选择. 只有当主从系统都失败了, 才会去执行这个门面command的fallback逻辑.
门面HystrixCommand可以使用信号量隔离的, 因为其业务逻辑仅仅是调用另外两个线程隔离的HystrixCommand, 它不涉及任何的网络访问、重试等容易出错的事, 因此没必要将这部分代码放到其他线程去执行.
public class CommandFacadeWithPrimarySecondary extends HystrixCommandClient Doesn"t Perform Network Access{ private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true); private final int id; public CommandFacadeWithPrimarySecondary(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand")) .andCommandPropertiesDefaults( // we want to default to semaphore-isolation since this wraps // 2 others commands that are already thread isolated HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { if (usePrimary.get()) { return new PrimaryCommand(id).execute(); } else { return new SecondaryCommand(id).execute(); } } @Override protected String getFallback() { return "static-fallback-" + id; } @Override protected String getCacheKey() { return String.valueOf(id); } private static class PrimaryCommand extends HystrixCommand { private final int id; private PrimaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand")) .andCommandPropertiesDefaults( // we default to a 600ms timeout for primary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600))); this.id = id; } @Override protected String run() { // perform expensive "primary" service call return "responseFromPrimary-" + id; } } private static class SecondaryCommand extends HystrixCommand { private final int id; private SecondaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand")) .andCommandPropertiesDefaults( // we default to a 100ms timeout for secondary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100))); this.id = id; } @Override protected String run() { // perform fast "secondary" service call return "responseFromSecondary-" + id; } } public static class UnitTest { @Test public void testPrimary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true); assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } @Test public void testSecondary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false); assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } } }
当你使用HystrixCommand实现的业务逻辑不涉及到网络访问、对延迟敏感且无法接受多线程带来的开销时, 你需要设置executionIsolationStrategy)属性的值为ExecutionIsolationStrategy.SEMAPHORE, 此时Hystrix会使用信号量隔离代替线程隔离.
下面的代码展示了如何为command设置该属性(也可以在运行时动态改变这个属性的值):
public class CommandUsingSemaphoreIsolation extends HystrixCommandGet-Set-Get with Request Cache Invalidation{ private final int id; public CommandUsingSemaphoreIsolation(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) // since we"re doing an in-memory cache lookup we choose SEMAPHORE isolation .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { // a real implementation would retrieve data from in memory data structure return "ValueFromHashMap_" + id; } }
Get-Set-Get是指: Get请求的结果被缓存下来后, 另一个command对同一个资源发出了Set请求, 此时由Get请求缓存的结果应该失效, 避免随后的Get请求获取到过时的缓存结果, 此时可以通过调用HystrixRequestCache.clear())方法来使缓存失效.
public class CommandUsingRequestCacheInvalidation { /* represents a remote data store */ private static volatile String prefixStoredOnRemoteDataStore = "ValueBeforeSet_"; public static class GetterCommand extends HystrixCommand{ private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey("GetterCommand"); private final int id; public GetterCommand(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetSetGet")) .andCommandKey(GETTER_KEY)); this.id = id; } @Override protected String run() { return prefixStoredOnRemoteDataStore + id; } @Override protected String getCacheKey() { return String.valueOf(id); } /** * Allow the cache to be flushed for this object. * * @param id * argument that would normally be passed to the command */ public static void flushCache(int id) { HystrixRequestCache.getInstance(GETTER_KEY, HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id)); } } public static class SetterCommand extends HystrixCommand { private final int id; private final String prefix; public SetterCommand(int id, String prefix) { super(HystrixCommandGroupKey.Factory.asKey("GetSetGet")); this.id = id; this.prefix = prefix; } @Override protected Void run() { // persist the value against the datastore prefixStoredOnRemoteDataStore = prefix; // flush the cache GetterCommand.flushCache(id); // no return value return null; } } }
@Test public void getGetSetGet() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertEquals("ValueBeforeSet_1", new GetterCommand(1).execute()); GetterCommand commandAgainstCache = new GetterCommand(1); assertEquals("ValueBeforeSet_1", commandAgainstCache.execute()); // confirm it executed against cache the second time assertTrue(commandAgainstCache.isResponseFromCache()); // set the new value new SetterCommand(1, "ValueAfterSet_").execute(); // fetch it again GetterCommand commandAfterSet = new GetterCommand(1); // the getter should return with the new prefix, not the value from cache assertFalse(commandAfterSet.isResponseFromCache()); assertEquals("ValueAfterSet_1", commandAfterSet.execute()); } finally { context.shutdown(); } } }Migrating a Library to Hystrix
如果你要迁移一个已有的客户端库到Hystrix, 你应该将所有的服务方法(service methods)替换成HystrixCommand.
服务方法(service methods)转而调用HystrixCommand且不在包含任何额外的业务逻辑.
因此, 在迁移之前, 一个服务库可能是这样的:
迁移完成之后, 服务库的用户要能直接访问到HystrixCommand, 或者通过服务门面(service facade)的代理间接访问到HystrixCommand.
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/73416.html
摘要:使用线程池的好处通过线程在自己的线程池中隔离的好处是该应用程序完全可以不受失控的客户端库的威胁。简而言之,由线程池提供的隔离功能可以使客户端库和子系统性能特性的不断变化和动态组合得到优雅的处理,而不会造成中断。 工作流程图 下面的流程图展示了当使用Hystrix的依赖请求,Hystrix是如何工作的。showImg(https://segmentfault.com/img/bV0...
摘要:断路器本身是一种开关装置,用于在电路上保护线路过载,当线路中有电器发生短路时,断路器能够及时的切断故障电路,防止发生过载发热甚至起火等严重后果。具备拥有回退机制和断路器功能的线程和信号隔离,请求缓存和请求打包,以及监控和配置等功能。 转载请注明出处 http://www.paraller.com 代码机制:熔断 & Fallback & 资源隔离 熔断 概念: 在微服务架构中,我们将系...
摘要:脚本位置依赖内采样率,默认即如需测试时每次都看到则修改为,但对性能有影响,注意上线时修改为合理值运行查询参考规范推荐推荐谷歌的大规模分布式跟踪系统分布式服务的 zipkin-server pom io.zipkin zipkin-ui 1.39.3 or...
摘要:断路器原理断路器在和执行过程中起到至关重要的作用。其中通过来定义,每一个命令都需要有一个来标识,同时根据这个可以找到对应的断路器实例。一个啥都不做的断路器,它允许所有请求通过,并且断路器始终处于闭合状态断路器的另一个实现类。 断路器原理 断路器在HystrixCommand和HystrixObservableCommand执行过程中起到至关重要的作用。查看一下核心组件HystrixCi...
阅读 1931·2023-04-26 01:59
阅读 3247·2021-10-11 11:07
阅读 3261·2021-09-22 15:43
阅读 3341·2021-09-02 15:21
阅读 2523·2021-09-01 10:49
阅读 874·2019-08-29 15:15
阅读 3072·2019-08-29 13:59
阅读 2808·2019-08-26 13:36