Skip to content

Commit 922d314

Browse files
JamesNKmot256
andauthored
Load balancing fix to clear disposed subchannels on resolver error (#1853)
Co-authored-by: Francois Nel <francois.nel@iotnxt.com>
1 parent 0551be8 commit 922d314

File tree

4 files changed

+134
-3
lines changed

4 files changed

+134
-3
lines changed

src/Grpc.Net.Client/Balancer/BalancerAddress.cs

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#endregion
1818

1919
#if SUPPORT_LOAD_BALANCING
20+
using System.Diagnostics;
2021
using System.Net;
2122

2223
namespace Grpc.Net.Client.Balancer
@@ -35,6 +36,7 @@ public sealed class BalancerAddress
3536
/// Initializes a new instance of the <see cref="BalancerAddress"/> class with the specified <see cref="DnsEndPoint"/>.
3637
/// </summary>
3738
/// <param name="endPoint">The end point.</param>
39+
[DebuggerStepThrough]
3840
public BalancerAddress(DnsEndPoint endPoint)
3941
{
4042
EndPoint = endPoint;
@@ -45,6 +47,7 @@ public BalancerAddress(DnsEndPoint endPoint)
4547
/// </summary>
4648
/// <param name="host">The host.</param>
4749
/// <param name="port">The port.</param>
50+
[DebuggerStepThrough]
4851
public BalancerAddress(string host, int port) : this(new DnsEndPoint(host, port))
4952
{
5053
}

src/Grpc.Net.Client/Balancer/SubchannelsLoadBalancer.cs

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ private void ResolverError(Status status)
7676
{
7777
RemoveSubchannel(addressSubchannel.Subchannel);
7878
}
79+
_addressSubchannels.Clear();
7980
Controller.UpdateState(new BalancerState(ConnectivityState.TransientFailure, new ErrorPicker(status)));
8081
break;
8182
}

test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs

+115-2
Original file line numberDiff line numberDiff line change
@@ -327,18 +327,131 @@ public async Task UpdateAddresses_ConnectIsInProgress_InProgressConnectIsCancele
327327
Assert.AreEqual(81, connectAddress2.Port);
328328
}
329329

330+
[Test]
331+
public async Task PickAsync_DoesNotDeadlockAfterReconnect_WithResolverError()
332+
{
333+
// Arrange
334+
var services = new ServiceCollection();
335+
services.AddNUnitLogger();
336+
await using var serviceProvider = services.BuildServiceProvider();
337+
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();
338+
339+
var resolver = new TestResolver(loggerFactory);
340+
341+
GrpcChannelOptions channelOptions = new GrpcChannelOptions();
342+
channelOptions.ServiceConfig = new ServiceConfig()
343+
{
344+
LoadBalancingConfigs = { new RoundRobinConfig() }
345+
};
346+
347+
var transportFactory = new TestSubchannelTransportFactory();
348+
var clientChannel = CreateConnectionManager(loggerFactory, resolver, transportFactory, new[] { new RoundRobinBalancerFactory() });
349+
// Configure balancer similar to how GrpcChannel constructor does it
350+
clientChannel.ConfigureBalancer(c => new ChildHandlerLoadBalancer(
351+
c,
352+
channelOptions.ServiceConfig,
353+
clientChannel));
354+
355+
// Act
356+
var connectTask = clientChannel.ConnectAsync(waitForReady: true, cancellationToken: CancellationToken.None);
357+
var pickTask = clientChannel.PickAsync(
358+
new PickContext { Request = new HttpRequestMessage() },
359+
waitForReady: true,
360+
CancellationToken.None).AsTask();
361+
362+
resolver.UpdateAddresses(new List<BalancerAddress>
363+
{
364+
new BalancerAddress("localhost", 80)
365+
});
366+
await Task.WhenAll(connectTask, pickTask).DefaultTimeout();
367+
368+
// Simulate transport/network issue
369+
transportFactory.Transports.ForEach(t => t.Disconnect());
370+
resolver.UpdateError(new Status(StatusCode.Unavailable, "Test error"));
371+
372+
pickTask = clientChannel.PickAsync(
373+
new PickContext { Request = new HttpRequestMessage() },
374+
waitForReady: true,
375+
CancellationToken.None).AsTask();
376+
resolver.UpdateAddresses(new List<BalancerAddress>
377+
{
378+
new BalancerAddress("localhost", 80)
379+
});
380+
381+
// Assert
382+
// Should not timeout (deadlock)
383+
await pickTask.DefaultTimeout();
384+
}
385+
386+
[Test]
387+
public async Task PickAsync_DoesNotDeadlockAfterReconnect_WithZeroAddressResolved()
388+
{
389+
// Arrange
390+
var services = new ServiceCollection();
391+
services.AddNUnitLogger();
392+
await using var serviceProvider = services.BuildServiceProvider();
393+
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();
394+
395+
var resolver = new TestResolver(loggerFactory);
396+
397+
GrpcChannelOptions channelOptions = new GrpcChannelOptions();
398+
channelOptions.ServiceConfig = new ServiceConfig()
399+
{
400+
LoadBalancingConfigs = { new RoundRobinConfig() }
401+
};
402+
403+
var transportFactory = new TestSubchannelTransportFactory();
404+
var clientChannel = CreateConnectionManager(loggerFactory, resolver, transportFactory, new[] { new RoundRobinBalancerFactory() });
405+
// Configure balancer similar to how GrpcChannel constructor does it
406+
clientChannel.ConfigureBalancer(c => new ChildHandlerLoadBalancer(
407+
c,
408+
channelOptions.ServiceConfig,
409+
clientChannel));
410+
411+
// Act
412+
var connectTask = clientChannel.ConnectAsync(waitForReady: true, cancellationToken: CancellationToken.None);
413+
var pickTask = clientChannel.PickAsync(
414+
new PickContext { Request = new HttpRequestMessage() },
415+
waitForReady: true,
416+
CancellationToken.None).AsTask();
417+
418+
resolver.UpdateAddresses(new List<BalancerAddress>
419+
{
420+
new BalancerAddress("localhost", 80)
421+
});
422+
await Task.WhenAll(connectTask, pickTask).DefaultTimeout();
423+
424+
// Simulate transport/network issue (with resolver reporting no addresses)
425+
transportFactory.Transports.ForEach(t => t.Disconnect());
426+
resolver.UpdateAddresses(new List<BalancerAddress>());
427+
428+
pickTask = clientChannel.PickAsync(
429+
new PickContext { Request = new HttpRequestMessage() },
430+
waitForReady: true,
431+
CancellationToken.None).AsTask();
432+
resolver.UpdateAddresses(new List<BalancerAddress>
433+
{
434+
new BalancerAddress("localhost", 80)
435+
});
436+
437+
// Assert
438+
// Should not timeout (deadlock)
439+
await pickTask.DefaultTimeout();
440+
}
441+
330442
private static ConnectionManager CreateConnectionManager(
331443
ILoggerFactory loggerFactory,
332444
Resolver resolver,
333-
TestSubchannelTransportFactory transportFactory)
445+
TestSubchannelTransportFactory transportFactory,
446+
LoadBalancerFactory[]? loadBalancerFactories = null)
334447
{
335448
return new ConnectionManager(
336449
resolver,
337450
disableResolverServiceConfig: false,
338451
loggerFactory,
339452
new TestBackoffPolicyFactory(),
340453
transportFactory,
341-
Array.Empty<LoadBalancerFactory>());
454+
loadBalancerFactories ?? Array.Empty<LoadBalancerFactory>());
342455
}
343456

344457
private class DropLoadBalancer : LoadBalancer

test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs

+15-1
Original file line numberDiff line numberDiff line change
@@ -214,25 +214,34 @@ public async Task HasSubchannels_SubchannelStatusChanges_RefreshResolver()
214214
var services = new ServiceCollection();
215215
services.AddNUnitLogger();
216216

217+
ILogger? logger = null;
217218
SyncPoint? syncPoint = new SyncPoint(runContinuationsAsynchronously: true);
218219

219220
var connectState = ConnectivityState.Ready;
220221

221-
var transportFactory = new TestSubchannelTransportFactory((s, c) => Task.FromResult(connectState));
222+
var transportFactory = new TestSubchannelTransportFactory((s, c) =>
223+
{
224+
logger.LogInformation($"Transport factory returning state: {connectState}");
225+
return Task.FromResult(connectState);
226+
});
222227
services.AddSingleton<TestResolver>(s =>
223228
{
224229
return new TestResolver(
225230
s.GetRequiredService<ILoggerFactory>(),
226231
async () =>
227232
{
233+
logger.LogInformation("Resolver waiting to continue.");
228234
await syncPoint.WaitToContinue().DefaultTimeout();
235+
236+
logger.LogInformation("Resolver creating new sync point.");
229237
syncPoint = new SyncPoint(runContinuationsAsynchronously: true);
230238
});
231239
});
232240
services.AddSingleton<ResolverFactory, TestResolverFactory>();
233241
services.AddSingleton<ISubchannelTransportFactory>(transportFactory);
234242
var serviceProvider = services.BuildServiceProvider();
235243

244+
logger = serviceProvider.GetRequiredService<ILogger<RoundRobinBalancerTests>>();
236245
var handler = new TestHttpMessageHandler((r, ct) => default!);
237246
var channelOptions = new GrpcChannelOptions
238247
{
@@ -250,10 +259,13 @@ public async Task HasSubchannels_SubchannelStatusChanges_RefreshResolver()
250259

251260
// Act
252261
var channel = GrpcChannel.ForAddress("test:///localhost", channelOptions);
262+
263+
logger.LogInformation("Client connecting");
253264
var connectTask = channel.ConnectAsync();
254265

255266
// Assert
256267
syncPoint!.Continue();
268+
logger.LogInformation("Client waiting for connect to complete.");
257269
await connectTask.DefaultTimeout();
258270

259271
var subchannels = channel.ConnectionManager.GetSubchannels();
@@ -265,10 +277,12 @@ public async Task HasSubchannels_SubchannelStatusChanges_RefreshResolver()
265277
await transportFactory.Transports.Single().TryConnectTask.DefaultTimeout();
266278
Assert.AreEqual(ConnectivityState.Ready, subchannels[0].State);
267279

280+
logger.LogInformation("Transport factory updating state.");
268281
connectState = ConnectivityState.TransientFailure;
269282
transportFactory.Transports.Single().UpdateState(ConnectivityState.Idle);
270283

271284
// Transport will refresh resolver after some failures
285+
logger.LogInformation("Waiting for sync point in resolver.");
272286
await syncPoint!.WaitForSyncPoint().DefaultTimeout();
273287
syncPoint.Continue();
274288
}

0 commit comments

Comments
 (0)