Skip to content

Commit 59aa32c

Browse files
committed
Change the observable value returned when the command was unsubscribed from an error to an empty value. This fixes Netflix#1603
1 parent ea14be2 commit 59aa32c

File tree

2 files changed

+143
-1
lines changed

2 files changed

+143
-1
lines changed

hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ public Observable<R> call() {
683683
}
684684
} else {
685685
//command has already been unsubscribed, so return immediately
686-
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
686+
return Observable.empty();
687687
}
688688
}
689689
}).doOnTerminate(new Action0() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/**
2+
* Copyright 2017 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.netflix.hystrix;
18+
19+
import com.netflix.hystrix.exception.HystrixRuntimeException;
20+
import com.netflix.hystrix.strategy.HystrixPlugins;
21+
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
22+
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
23+
import org.junit.Test;
24+
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicInteger;
31+
32+
import static org.junit.Assert.*;
33+
34+
public class UnsubscribedTasksRequestCacheTest {
35+
36+
private AtomicBoolean encounteredCommandException = new AtomicBoolean(false);
37+
private AtomicInteger numOfExecutions = new AtomicInteger(0);
38+
39+
public class CommandExecutionHook extends HystrixCommandExecutionHook {
40+
41+
@Override
42+
public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
43+
e.printStackTrace();
44+
encounteredCommandException.set(true);
45+
return e;
46+
}
47+
}
48+
49+
public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
50+
51+
private final int value;
52+
53+
protected CommandUsingRequestCache(int value) {
54+
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
55+
this.value = value;
56+
}
57+
58+
@Override
59+
protected Boolean run() {
60+
numOfExecutions.getAndIncrement();
61+
try {
62+
Thread.sleep(500);
63+
}
64+
catch (InterruptedException e) {
65+
e.printStackTrace();
66+
}
67+
System.out.println(Thread.currentThread().getName() + " run()");
68+
return value == 0 || value % 2 == 0;
69+
}
70+
71+
@Override
72+
protected String getCacheKey() {
73+
return String.valueOf(value);
74+
}
75+
}
76+
77+
@Test
78+
public void testOneCommandIsUnsubscribed() throws ExecutionException, InterruptedException {
79+
80+
HystrixPlugins.getInstance().registerCommandExecutionHook(new CommandExecutionHook());
81+
HystrixRequestContext context = HystrixRequestContext.initializeContext();
82+
final AtomicInteger numCacheResponses = new AtomicInteger(0);
83+
84+
85+
try {
86+
ExecutorService executorService = Executors.newFixedThreadPool(2);
87+
88+
Future futureCommand2a = executorService.submit(new Runnable() {
89+
90+
public void run() {
91+
92+
HystrixRequestContext.setContextOnCurrentThread(context);
93+
94+
CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
95+
Future<Boolean> resultCommand2a = command2a.queue();
96+
97+
try {
98+
assertTrue(resultCommand2a.get());
99+
System.out.println(Thread.currentThread() + " " + command2a.isResponseFromCache());
100+
if (command2a.isResponseFromCache()) {
101+
numCacheResponses.getAndIncrement();
102+
}
103+
} catch (Exception e) {
104+
fail("Exception: " + e.getMessage());
105+
}
106+
}
107+
});
108+
109+
Future futureCommand2b = executorService.submit(new Runnable() {
110+
111+
public void run() {
112+
113+
HystrixRequestContext.setContextOnCurrentThread(context);
114+
115+
CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);
116+
Future<Boolean> resultCommand2b = command2b.queue();
117+
118+
try {
119+
assertTrue(resultCommand2b.get());
120+
System.out.println(Thread.currentThread() + " " + command2b.isResponseFromCache());
121+
if (command2b.isResponseFromCache()) {
122+
numCacheResponses.getAndIncrement();
123+
}
124+
} catch (Exception e) {
125+
fail("Exception: " + e.getMessage());
126+
}
127+
}
128+
});
129+
130+
futureCommand2a.get();
131+
futureCommand2b.get();
132+
133+
assertEquals(1, numCacheResponses.get());
134+
assertEquals(1, numOfExecutions.get());
135+
assertEquals(false, encounteredCommandException.get());
136+
137+
} finally {
138+
context.shutdown();
139+
}
140+
}
141+
142+
}

0 commit comments

Comments
 (0)