Hystrix入门

Hystrix入门

假设有一个应用程序,有着如下的调用关系:

单体调用

假设在某个时刻,数据库因为某些原因不可用了,基础服务就会得到“数据库无法访问”的信息,炳辉将次信息告知服务A。在出现问题时,用户不断地请求服务A模块,而服务A模块则继续请求基础服务模块,基础服务模块仍然不停地连接有问题的数据库直到超时,大量用户的请求(包括重试的请求)持续发送过来,整个应用不堪重负。

更有甚者,因为数据库的长时间响应或者无法响应,可能导致整个机房的网络阻塞,影响到同机房的所有其他服务。

这个时候,如果服务A在调用基础模块的时候将基础模块隔离开来,短时间内不再调用基础模块,并且快速响应用户的请求,就可以保证服务A自身乃至整个集群的稳定性。

单体调用失败熔断

Hystrix是Netflix下的一个Java库,Spring Cloud将Hystrix整合到Netflix项目中,Hystrix通过添加延迟阈值以及容错的逻辑,来帮助我们控制分布式系统间组件的交互。Hystrix通过隔离服务间的访问点,停止它们之间的级联故障。提供可回退操作来实现容错。

Hystrix主要实现以下功能:

  • 当所依赖的网络服务发生延迟或者失败时,对访问的客户端程序进行保护
  • 在分布式系统中,停止级联故障
  • 网络服务恢复正常后,可以快速恢复客户端的访问能力
  • 调用失败时执行服务回退
  • 可支持实时监控、报警和其他操作

第一个Hystrix程序

Hystrix Server端

新建一个Server端项目,提供两个REST服务。一个正常返回,一个“伪装”成故障调用,休眠10S再返回。

pom的依赖如下:

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
</dependencies>

提供REST服务的Controller如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
public class MyController {
@GetMapping("/normalHello")
public String normalHello(HttpServletRequest request) {
return "Hello world";
}

@GetMapping("/errorHello")
public String errorHello(HttpServletRequest request) throws Exception {
Thread.sleep(10000);
return "Error Hello world.";
}
}

启动类如下:

1
2
3
4
5
6
@SpringBootApplication
public class ServerApplication {
public static void main(String[] args) {
new SpringApplication(ServerApplication.class).run(args);
}
}

Hystrix Client端

Client的依赖如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependencies>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.12</version>
</dependency>
</dependencies>

新建一个命令类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class HelloCommand extends HystrixCommand<String> {

private String url;

CloseableHttpClient httpClient;

public HelloCommand(String url) {
// 调用父类的构造器,设置命令组的key,默认用来做线程池的key
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
// 创建HttpClient客户端
this.httpClient = HttpClients.createDefault();
this.url = url;
}

protected String run() throws Exception {
try {
HttpGet httpGet = new HttpGet(url);
// 得到服务响应
HttpResponse response = httpClient.execute(httpGet);
return EntityUtils.toString(response.getEntity());
} catch (Exception e) {
e.printStackTrace();
}
return "";
}

@Override
protected String getFallback() {
System.out.println("执行HelloCommand的回退方法.");
return "error";
}
}

实现它的run方法,发出一个HTTP请求。重写getFallback方法,作为调用失败,回退逻辑。

然后在写两个调用类,分别调用Server提供的normalHello和errorHello接口

1
2
3
4
5
6
7
8
9
public class HelloMain {
public static void main(String[] args) {
String normalUrl = "http://localhost:8080/normalHello";
HelloCommand command = new HelloCommand(normalUrl);
String result = command.execute();
System.out.println("服务正常请求结果:" + result);
}
}

1
2
3
4
5
6
7
8
public class HelloErrorMain {
public static void main(String[] args) {
String normalUrl = "http://localhost:8080/errorHello";
HelloCommand command = new HelloCommand(normalUrl);
String result = command.execute();
System.out.println("请求异常的服务结果:" + result);
}
}

调用normalHello

Hystrix默认的超时时间是1S,所以,调用errorHello接口超时后会走getFallback方法

调用errorHello

Hystrix的运作流程

Hystrix的执行流程

简单整理下Hystrix的运作流程

  1. 在命令开始执行时,做一些准备操作
  2. 判断是否打开了缓存,如果打开缓存,直接查找缓存并返回结果
  3. 判断断路器是否打开,如果打开了表示链路不可用,直接执行回退方法
  4. 判断线程池、信号量(计数器)等条件,例如像线程池超负荷,则执行回退方法,否则,就去执行命令的内容
  5. 执行命令,计算是否要对断路器进行处理,执行完成后如满足一定条件,则需要开启断路器。如果执行成功,则返回结果,反之则执行回退。

Hystrix的使用

命令执行

一个命令对象可以使用以下方法来执行命令:

  • toObservable:返回一个最原始的可观察的实例(Observable),Observable是R小Java的类,使用该对象可以观察命令的执行过程,并且将执行信息传递给订阅者
  • observe:调用toObservable方法,获得一个原始的Observable实例后,使用ReplaySubject作为原始Observable的订阅者
  • queue:通过toObservable方法获取原始的Observable实例,在调用Observable的toBlocking方法得到一个BlockingObservable实例,最后调用BlockingObservable的toFuture方法返回Future实例,调用Future的get方法得到执行结果。
  • execute:调用queue的get方法返回命令的执行结果,该方法同步执行。

上面4个方法,除了execute外,都为异步执行。observe和toObservable方法的区别在于,toObservable方法被调用后,命令不会立即执行,只有当返回的Observable实例被订阅后,才会真正执行命令。而在observe方法的实现中,会调用toObservable得到Observable实例,再对其进行订阅,因此调用observe方法后会立即执行命令(异步)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class RunTest {

public static void main(String[] args) throws Exception {
// 使用execute方法
RunCommand c1 = new RunCommand("使用execute方法执行命令");
c1.execute();

// 使用queue方法
RunCommand c2 = new RunCommand("使用queue方法执行命令");
c2.queue();

// 使用observe方法
RunCommand c3 = new RunCommand("使用observe方法执行命令");
c3.observe();

// 使用toObservable方法
RunCommand c4 = new RunCommand("使用toObservable方法执行命令");
Observable<String> ob = c4.toObservable();
// 进行订阅,此时会执行命令
ob.subscribe(new Observer<String>() {
public void onCompleted() {
System.out.println(" 命令执行完成");
}

public void onError(Throwable throwable) {

}

public void onNext(String s) {
System.out.println(" 命令执行结果:" + s);
}
});
Thread.sleep(100);
}

static class RunCommand extends HystrixCommand<String> {
String msg;

public RunCommand(String msg) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.msg = msg;
}
protected String run() throws Exception {
System.out.println(msg);

return "success";
}
}
}


属性配置

使用Hystrix时,可以为命令设置属性。将超时时间由默认的1S改为500ms。但只对当前命令有效。

1
2
3
4
5
public HelloCommand(boolean isTimeout) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")).
andCommandPropertiesDefaults(HystrixCommandProperties.Setter().
withExecutionTimeoutInMilliseconds(500)));
}

如果向全局生效,可以使用以下代码片段:

1
2
3
ConfigurationManager.getConfigInstance().
setProperty("hystrix.command.default.execution.isolation.thread.timoutInMilliseconds", 500);

除了超时配置外,还需要了解下命令的相关名称,可以为命令设置以下名称:

  • 命令组名称(GroupKey):必须提供命令组名称,默认情况下,全局维护的线程池Map以该值作为Key,该Map的value为执行命令的线程池
  • 命令名称(CommandKey):可选参数
  • 线程池名称(ThreadPoolKey):指定了线程的key后,全局维护的线程池Map将以该值作为key

以下的代码片段分别设置上面的三个key

1
2
3
4
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("group-key")).
andCommandKey(HystrixCommandKey.Factory.asKey("command-key")).
andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey")));

断路器开启

断路器一旦开启,就会直接调用回退方法,不再执行命令,而且也不会更新链路的健康状况。断路器的开启需要满足两个条件:

  • 整个链路达到一定阈值,默认情况下,10秒内产生超过20次请求,则符合第一个条件
  • 满足第一个条件的情况下,如果请求的错误百分比大于阈值,则会打开断路器默认为50%
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.netflix.config.ConfigurationManager;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;

public class OpenTest {
public static void main(String[] args) {
ConfigurationManager.getConfigInstance().setProperty("hystrix.command.default.metrics.rollingStats.timeInMilliseconds", 10000);
ConfigurationManager.getConfigInstance().setProperty("hystrix.command.default.circuitBreaker.requestVolumeThreshold", 10);
ConfigurationManager.getConfigInstance().setProperty("hystrix.command.default.circuitBreaker.errorThresholdPercentage", 50);

for (int i = 0;i < 15;i++) {
MyCommand c = new MyCommand();
c.execute();
if (c.isCircuitBreakerOpen()) {
System.out.println("断路器被打开,执行第" + (i + 1) + "个命令");
}
}
}

static class MyCommand extends HystrixCommand<String> {
public MyCommand() {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")).
andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(500)));
}

protected String run() throws Exception {
// 模拟处理超时
Thread.sleep(800);
return "";
}

@Override
protected String getFallback() {
return "";
}
}
}

断路器

断路器关闭

断路器打开后,在一段时间内,命令不会再执行(一直触发回退),这段时间我们称作“休眠期”。休眠期的默认值为5秒,休眠期结束后,Hystrix会尝试性地执行一次命令,此时断路器的状态不是开启,也不是关闭,而是半开的状态,如果这一次命令执行成功,则会关闭断路器并清空链路的健康信息;如果执行失败,断路器会继续保持打开的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class CloseTest {
public static void main(String[] args) throws Exception {
// 10秒钟内有3个请求就满足第一个开启断路器的条件
ConfigurationManager.getConfigInstance().setProperty("hystrix.command.default.metrics.rollingStats.timeInMilliseconds", 10000);
ConfigurationManager.getConfigInstance().setProperty("hystrix.command.default.circuitBreaker.requestVolumeThreshold", 3);
// 请求的失败率,默认为50%
ConfigurationManager.getConfigInstance().setProperty("hystrix.command.default.circuitBreaker.errorThresholdPercentage", 50);
// 设置休眠期,断路器打开后,这段时间不会再执行命令,默认值为5秒,此处设置为3秒
ConfigurationManager.getConfigInstance().setProperty("hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds", 3000);

boolean isTimeout = true;
for (int i = 0;i < 10;i++) {
MyCommand c = new MyCommand(isTimeout);
c.execute();
// 输出健康状态等信息
HystrixCommandMetrics.HealthCounts hc = c.getMetrics().getHealthCounts();
System.out.println("断路器状态:" + c.isCircuitBreakerOpen() + " 请求总数: " + hc.getTotalRequests());

if (c.isCircuitBreakerOpen()) {
isTimeout = false;
System.out.println("断路器被打开,等待休眠期结束");
// 休眠期会在3秒钟结束,此处休眠4秒
Thread.sleep(4000);
}
}
}

static class MyCommand extends HystrixCommand<String> {
private boolean isTimeout;
public MyCommand(boolean isTimeout) {
super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")).
andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(500)));
this.isTimeout = isTimeout;
}

protected String run() throws Exception {
// 让外部决定是否超时
if (isTimeout) {
Thread.sleep(800);
} else {
Thread.sleep(200);
}
return "";
}

@Override
protected String getFallback() {
return "";
}
}
}

断路器关闭

隔离机制

命令的真正执行,除了断路器要关闭以外,还要再过一关:执行命令的线程池或者信号量是否满载。如果满载,命令就不会执行,而是直接触发回退,这样的机制,在控制命令的执行上,实现了错误的隔离。Hystrix提供了两种隔离策略:

  • THREAD:默认值,由线程池来决定命令的执行,如线程池满载,则不会执行命令。Hystrix使用了ThreadPoolExecutor来控制线程池的行为,线程池的默认大小为10.
  • SEMAPHORE:由信号量来决定命令的执行,当请求的并发数高于阈值时,就不再执行命令。

相对于线程策略,信号量策略开销更小,但是该策略不支持超时以及异步,除非对调用的服务有足够的信任,否则不建议使用该策略进行隔离。

下面举个例子,来看看两个隔离方式有什么区别、

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MyCommand extends HystrixCommand<String> {
int index;

public MyCommand(int index) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")));
this.index = index;
}

protected String run() throws Exception {
Thread.sleep(500);
System.out.println("执行方法,当前索引:" + index);
return "";
}

@Override
protected String getFallback() {
System.out.println("执行fallback,当前索引:" + index);
return "";
}
}

线程池隔离

1
2
3
4
5
6
7
8
9
10
11
public class ThreadIso {
public static void main(String[] args) throws Exception {
// 配置线程池大小为3
ConfigurationManager.getConfigInstance().setProperty("hystrix.threadpool.default.coreSize", 3);
for (int i = 0;i < 6;i++) {
MyCommand c = new MyCommand(i);
c.queue();
}
Thread.sleep(5000);
}
}

信号量隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SemaphoreIso {
public static void main(String[] args) throws Exception {
// 配置使用信号量的策略进行隔离
ConfigurationManager.getConfigInstance().setProperty("hystrix.command.default.execution.isolation.strategy",
HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE);
// 设置最大并发数,默认值为10,本例设置为2
ConfigurationManager.getConfigInstance().setProperty("hystrix.command.default.execution.isolation.semaphore.maxConcurrentRequests", 2);
// 设置执行回退方法的最大并发,默认值为10,本例设置为20
ConfigurationManager.getConfigInstance().setProperty("hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests", 20);
for (int i = 0;i < 6;i++) {
final int index = i;
Thread t = new Thread() {
public void run() {
MyCommand c = new MyCommand(index);
c.execute();
}
};
t.start();
}
Thread.sleep(5000);
}
}

执行ThreadIso中的main方法,可以看到有3次命令会触发回退。

信号量隔离

执行SemaphoreIso中的方法,可以看到有4次命令会触发回退。

信号量隔离

合并请求

默认情况下,Hystrix会为命令分配线程池来执行命令实例,线程池会消耗一定的性能。对于一些同类型的请求(URL相同,参数不同),Hystrix提供了合并请求的功能,在一次请求的过程中,可以将一个时间段内的相同请求(参数不同),收集到同一个命令中执行,这样就节省了线程的开销,减少了网络连接,从而提升了执行的性能。

实现合并请求的功能,至少包含以下3个条件:

  • 需要有一个执行请求的命令,将全部参数进行整理,然后调用外部服务
  • 需要有一个合并处理器,用于收集请求,以及处理结果
  • 外部接口支持。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class CollapseTest {

public static void main(String[] args) throws Exception {
// 收集 1 秒内发生的请求,合并为一个命令执行
ConfigurationManager.getConfigInstance().setProperty(
"hystrix.collapser.default.timerDelayInMilliseconds", 1000);
// 请求上下文
HystrixRequestContext context = HystrixRequestContext
.initializeContext();
// 创建请求合并处理器
MyHystrixCollapser c1 = new MyHystrixCollapser("Angus");
MyHystrixCollapser c2 = new MyHystrixCollapser("Crazyit");
MyHystrixCollapser c3 = new MyHystrixCollapser("Sune");
MyHystrixCollapser c4 = new MyHystrixCollapser("Paris");
// 异步执行
Future<Person> f1 = c1.queue();
Future<Person> f2 = c2.queue();
Future<Person> f3 = c3.queue();
Future<Person> f4 = c4.queue();
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
context.shutdown();
}

static class CollapserCommand extends HystrixCommand<Map<String, Person>> {
// 请求集合,第一个类型是单个请求返回的数据类型,第二是请求参数的类型
Collection<CollapsedRequest<Person, String>> requests;

private CollapserCommand(
Collection<CollapsedRequest<Person, String>> requests) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory
.asKey("ExampleGroup")));
this.requests = requests;
}

@Override
protected Map<String, Person> run() throws Exception {
System.out.println("收集参数后执行命令,参数数量:" + requests.size());
// 处理参数
List<String> personNames = new ArrayList<String>();
for(CollapsedRequest<Person, String> request : requests) {
personNames.add(request.getArgument());
}
// 调用服务(此处模拟调用),根据名称获取Person的Map
Map<String, Person> result = callService(personNames);
return result;
}

// 模拟服务返回
private Map<String, Person> callService(List<String> personNames) {
Map<String, Person> result = new HashMap<String, Person>();
for(String personName : personNames) {
Person p = new Person();
p.id = UUID.randomUUID().toString();
p.name = personName;
p.age = new Random().nextInt(30);
result.put(personName, p);
}
return result;
}
}

static class Person {
String id;
String name;
Integer age;

public String toString() {
return "id: " + id + ", name: " + name + ", age: " + age;
}
}

static class MyHystrixCollapser extends
HystrixCollapser<Map<String, Person>, Person, String> {

String personName;

public MyHystrixCollapser(String personName) {
this.personName = personName;
}

@Override
public String getRequestArgument() {
return personName;
}

@Override
protected HystrixCommand<Map<String, Person>> createCommand(
Collection<CollapsedRequest<Person, String>> requests) {
return new CollapserCommand(requests);
}

@Override
protected void mapResponseToRequests(Map<String, Person> batchResponse,
Collection<CollapsedRequest<Person, String>> requests) {
// 让结果与请求进行关联
for (CollapsedRequest<Person, String> request : requests) {
// 获取单个响应返回的结果
Person singleResult = batchResponse.get(request.getArgument());
// 关联到请求中
request.setResponse(singleResult);
}
}
}
}

执行结果

请求缓存

Hystrix支持缓存功能,如果在一次请求的过程中,多个地方调用同一个接口,可以考虑使用缓存。缓存打开后,下一次的命令不会去执行,直接到缓存中获取响应并返回。

开启缓存较为简单,在命令中重写父类的getCacheKey即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class CacheMain {

public static void main(String[] args) {
// 初始化请求上下文
HystrixRequestContext context = HystrixRequestContext.initializeContext();
// 请求正常的服务
String key = "cache-key";
MyCommand c1 = new MyCommand(key);
MyCommand c2 = new MyCommand(key);
MyCommand c3 = new MyCommand(key);
// 输出结果
System.out.println(c1.execute() + "c1 是否读取缓存: " + c1.isResponseFromCache());
System.out.println(c2.execute() + "c2 是否读取缓存: " + c2.isResponseFromCache());
System.out.println(c3.execute() + "c3 是否读取缓存: " + c3.isResponseFromCache());
// 获取缓存实例
HystrixRequestCache cache = HystrixRequestCache.getInstance(
HystrixCommandKey.Factory.asKey("MyCommandKey"),
HystrixConcurrencyStrategyDefault.getInstance());
// 清空缓存
cache.clear(key);
// 重新执行命令
MyCommand c4 = new MyCommand(key);
System.out.println(c4.execute() + "c4 是否读取缓存: " + c4.isResponseFromCache());

context.shutdown();
}

static class MyCommand extends HystrixCommand<String> {

private String key;

public MyCommand(String key) {
super(
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("MyCommandKey"))
);
this.key = key;
}

protected String run() throws Exception {
System.out.println("执行命令");
return "";
}

@Override
protected String getCacheKey() {
return this.key;
}
}
}

执行结果

参考资料

疯狂Spring Cloud微服务架构实战

0%