-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
java.lang.RuntimeException: unsubscribed before executing run() with request caching enabled #1603
Comments
OK, I've successfully produced the issue in a unit test. Below is the test file where I'm able to reproduce. It seems like the root cause is that it's hitting this condition in AbstractCommand. import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixInvokable;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class RequestCacheTest {
private AtomicBoolean encounteredCommandException = new AtomicBoolean(false);
public class CommandExecutionHook extends HystrixCommandExecutionHook {
@Override
public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
e.printStackTrace();
encounteredCommandException.set(true);
return e;
}
}
public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
private final int value;
protected CommandUsingRequestCache(int value) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.value = value;
}
@Override
protected Boolean run() {
try {
Thread.sleep(500);
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " run()");
return value == 0 || value % 2 == 0;
}
@Override
protected String getCacheKey() {
return String.valueOf(value);
}
}
@Test
public void testGenericCommandsWithCacheKey() throws ExecutionException, InterruptedException {
HystrixPlugins.getInstance().registerCommandExecutionHook(new CommandExecutionHook());
HystrixRequestContext context = HystrixRequestContext.initializeContext();
AtomicInteger numCacheResponses = new AtomicInteger(0);
try {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<?> futureCommand2a = executorService.submit(() -> {
HystrixRequestContext.setContextOnCurrentThread(context);
CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
Future<Boolean> resultCommand2a = command2a.queue();
try {
assertTrue(resultCommand2a.get());
System.out.println(Thread.currentThread() + " " + command2a.isResponseFromCache());
if (command2a.isResponseFromCache()) {
numCacheResponses.getAndIncrement();
}
}
catch (Exception e) {
fail("Exception: " + e.getMessage());
}
});
Future<?> futureCommand2b = executorService.submit(() -> {
HystrixRequestContext.setContextOnCurrentThread(context);
CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);
Future<Boolean> resultCommand2b = command2b.queue();
try {
assertTrue(resultCommand2b.get());
System.out.println(Thread.currentThread() + " " + command2b.isResponseFromCache());
if (command2b.isResponseFromCache()) {
numCacheResponses.getAndIncrement();
}
}
catch (Exception e) {
fail("Exception: " + e.getMessage());
}
});
futureCommand2a.get();
futureCommand2b.get();
assertEquals(1, numCacheResponses.get());
assertEquals(false, encounteredCommandException.get());
} finally {
context.shutdown();
}
}
} |
Hello Steven, thanks for your unit test. I thought I'd try to take a stab at fixing this issue. I tried to remove the call to unsubscribe, the error is not logged anymore, and the test passes. I then modified the code to simply return an empty observable value (Observable.empty()). I am now creating a pull request with your test and my one-liner code changes. Once it's ready, would you please make sure my changes are OK? |
…d from an error to an empty value. This fixes Netflix#1603
…d from an error to an empty value. This fixes Netflix#1603
…d from an error to an empty value. This fixes Netflix#1603
…d from an error to an empty value. This fixes Netflix#1603
@stevenchurd I've just merged @atoulme's change (#1605). Are you able to retest with Hystrix built from master? |
Thanks @mattrjacobs, I can test with a build from master. |
@mattrjacobs, fix looks good from my testing. Any ETA on when this will make it into the next maven artifact? Thanks! |
Just released in v1.5.13 |
After upgrading from Hystrix 1.4.x to 1.5.10, we started seeing this exception from AbstractCommand being logged to our service's output.
This happens when request caching is enabled and the value is retrieved from the cache. Seemingly it doesn't affect the outcome of the command result and the correct value is returned by
command.queue
.This seems to be highly timing dependent. From the investigation I've done so far, I've narrowed it down to the following scenario:
command.queue()
is called on them at approximately the same time.command1
) executes asynchronously.command2
and logged to the console.command1
returns the result withcommand1.isResponseFromCache()
returning false.7,
command2
returns with the correct result from the request cache andcommand2.isResponseFromCache()
returns true.I've been trying (so far unsuccessfully) to reproduce this issue in a unit test. It seems, however, highly timing dependent. Any insight that can be provided so that I can produce a unit test that re-creates this scenario would be much appreciated!
The text was updated successfully, but these errors were encountered: