Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor code refactoring #2428

Merged
merged 12 commits into from
Sep 3, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public URL getUrl() {

@Override
public URL configure(URL url) {
if (configuratorUrl == null || configuratorUrl.getHost() == null
|| url == null || url.getHost() == null) {
if (configuratorUrl.getHost() == null || url == null || url.getHost() == null) {
return url;
}
// If override url has port, means it is a provider address. We want to control a specific provider with this override url, it may take effect on the specific provider instance or on consumers holding this provider instance.
Expand Down Expand Up @@ -115,7 +114,7 @@ public int compareTo(Configurator o) {
if (ipCompare == 0) {//host is the same, sort by priority
int i = getUrl().getParameter(Constants.PRIORITY_KEY, 0),
j = o.getUrl().getParameter(Constants.PRIORITY_KEY, 0);
return i < j ? -1 : (i == j ? 0 : 1);
return Integer.compare(i, j);
} else {
return ipCompare;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void destroy() {
* a) Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or,
* if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker</br>
* <p>
* b) Reslection, the validation rule for reselection: selected > available. This rule guarantees that
* b) Reselection, the validation rule for reselection: selected > available. This rule guarantees that
* the selected invoker has the minimum chance to be one in the previously selected list, and also
* guarantees this invoker is available.
*
Expand Down Expand Up @@ -268,8 +268,7 @@ protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invok
LoadBalance loadbalance) throws RpcException;

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
return directory.list(invocation);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
new NamedInternalThreadFactory("failback-cluster-timer", true));

private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<>();
private volatile ScheduledFuture<?> retryFuture;

public FailbackClusterInvoker(Directory<T> directory) {
Expand Down Expand Up @@ -91,8 +91,7 @@ void retryFailed() {
if (failed.size() == 0) {
return;
}
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
failed).entrySet()) {
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<>(failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBal
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
"Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
+ " select from all providers " + invokers + " for service " + getInterface().getName()
+ " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
e.getCause() != null ? e.getCause() : e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, L
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,19 @@ public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, L
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
selected = new ArrayList<>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
if (!selected.contains(invoker)) {
//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,14 @@
@SuppressWarnings("unchecked")
public class ForkingClusterInvokerTest {

List<Invoker<ForkingClusterInvokerTest>> invokers = new ArrayList<Invoker<ForkingClusterInvokerTest>>();
URL url = URL.valueOf("test://test:11/test?forks=2");
Invoker<ForkingClusterInvokerTest> invoker1 = mock(Invoker.class);
Invoker<ForkingClusterInvokerTest> invoker2 = mock(Invoker.class);
Invoker<ForkingClusterInvokerTest> invoker3 = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<ForkingClusterInvokerTest> dic;
Result result = new RpcResult();

/**
* @throws java.lang.Exception
*/
private List<Invoker<ForkingClusterInvokerTest>> invokers = new ArrayList<Invoker<ForkingClusterInvokerTest>>();
private URL url = URL.valueOf("test://test:11/test?forks=2");
private Invoker<ForkingClusterInvokerTest> invoker1 = mock(Invoker.class);
private Invoker<ForkingClusterInvokerTest> invoker2 = mock(Invoker.class);
private Invoker<ForkingClusterInvokerTest> invoker3 = mock(Invoker.class);
private RpcInvocation invocation = new RpcInvocation();
private Directory<ForkingClusterInvokerTest> dic;
private Result result = new RpcResult();

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -139,7 +135,7 @@ public void testClearRpcContext() {
invoker.invoke(invocation);
Assert.fail();
} catch (RpcException expected) {
Assert.assertTrue("Successed to forking invoke provider !", expected.getMessage().contains("Failed to forking invoke provider"));
Assert.assertTrue("Succeeded to forking invoke provider !", expected.getMessage().contains("Failed to forking invoke provider"));
assertFalse(expected.getCause() instanceof RpcException);
}
Map<String, String> afterInvoke = RpcContext.getContext().getAttachments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,12 @@ private T createProxy(Map<String, String> map) {
if (isInjvm() == null) {
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
} else {
isJvmRefer = false;
// by default, reference local service if there is
isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
}
} else {
isJvmRefer = isInjvm().booleanValue();
isJvmRefer = isInjvm();
}

if (isJvmRefer) {
Expand Down