Skip to content

Commit b421751

Browse files
authored
Fix unobserved exceptions with retries (#2255)
1 parent 41f67ad commit b421751

28 files changed

+809
-355
lines changed

src/Grpc.Net.Client/Internal/GrpcCall.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ private async Task RunCall(HttpRequestMessage request, TimeSpan? timeout)
639639
if (_responseTcs != null)
640640
{
641641
_responseTcs.TrySetException(resolvedException);
642-
642+
643643
// Always observe cancellation-like exceptions.
644644
if (IsCancellationOrDeadlineException(ex))
645645
{

src/Grpc.Net.Client/Internal/HttpClientCallInvoker.cs

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#region Copyright notice and license
1+
#region Copyright notice and license
22

33
// Copyright 2019 The gRPC Authors
44
//
@@ -43,6 +43,8 @@ public HttpClientCallInvoker(GrpcChannel channel)
4343
/// </summary>
4444
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options)
4545
{
46+
AssertMethodType(method, MethodType.ClientStreaming);
47+
4648
var call = CreateRootGrpcCall<TRequest, TResponse>(Channel, method, options);
4749
call.StartClientStreaming();
4850

@@ -67,6 +69,8 @@ public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreami
6769
/// </summary>
6870
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options)
6971
{
72+
AssertMethodType(method, MethodType.DuplexStreaming);
73+
7074
var call = CreateRootGrpcCall<TRequest, TResponse>(Channel, method, options);
7175
call.StartDuplexStreaming();
7276

@@ -90,6 +94,8 @@ public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreami
9094
/// </summary>
9195
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
9296
{
97+
AssertMethodType(method, MethodType.ServerStreaming);
98+
9399
var call = CreateRootGrpcCall<TRequest, TResponse>(Channel, method, options);
94100
call.StartServerStreaming(request);
95101

@@ -111,6 +117,8 @@ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRe
111117
/// </summary>
112118
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
113119
{
120+
AssertMethodType(method, MethodType.Unary);
121+
114122
var call = CreateRootGrpcCall<TRequest, TResponse>(Channel, method, options);
115123
call.StartUnary(request);
116124

@@ -127,6 +135,16 @@ public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Me
127135
return callWrapper;
128136
}
129137

138+
[Conditional("ASSERT_METHOD_TYPE")]
139+
private static void AssertMethodType(IMethod method, MethodType methodType)
140+
{
141+
// This can be used to assert tests are passing the right method type.
142+
if (method.Type != methodType)
143+
{
144+
throw new Exception("Expected method type: " + methodType);
145+
}
146+
}
147+
130148
/// <summary>
131149
/// Invokes a simple remote call in a blocking fashion.
132150
/// </summary>

src/Grpc.Net.Client/Internal/Retry/HedgingCall.cs

+11-2
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,19 @@ private async Task StartCall(Action<GrpcCall<TRequest, TResponse>> startCallFunc
202202
{
203203
if (CommitedCallTask.IsCompletedSuccessfully() && CommitedCallTask.Result == call)
204204
{
205+
// Ensure response task is created before waiting to the end.
206+
// Allows cancellation exceptions to be observed in cleanup.
207+
if (!HasResponseStream())
208+
{
209+
_ = GetResponseAsync();
210+
}
211+
205212
// Wait until the commited call is finished and then clean up hedging call.
206213
// Force yield here to prevent continuation running with any locks.
207-
await CompatibilityHelpers.AwaitWithYieldAsync(call.CallTask).ConfigureAwait(false);
208-
Cleanup();
214+
var status = await CompatibilityHelpers.AwaitWithYieldAsync(call.CallTask).ConfigureAwait(false);
215+
216+
var observeExceptions = status.StatusCode is StatusCode.Cancelled or StatusCode.DeadlineExceeded;
217+
Cleanup(observeExceptions);
209218
}
210219
}
211220
}

src/Grpc.Net.Client/Internal/Retry/RetryCall.cs

+14-5
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ private async Task StartRetry(Action<GrpcCall<TRequest, TResponse>> startCallFun
172172
// Commited so exit retry loop.
173173
return;
174174
}
175-
else if (IsSuccessfulStreamingCall(responseStatus.Value, currentCall))
175+
else if (IsSuccessfulStreamingCall(responseStatus.Value))
176176
{
177177
// Headers were returned. We're commited.
178178
CommitCall(currentCall, CommitReason.ResponseHeadersReceived);
@@ -252,25 +252,34 @@ private async Task StartRetry(Action<GrpcCall<TRequest, TResponse>> startCallFun
252252
{
253253
if (CommitedCallTask.Result is GrpcCall<TRequest, TResponse> call)
254254
{
255+
// Ensure response task is created before waiting to the end.
256+
// Allows cancellation exceptions to be observed in cleanup.
257+
if (!HasResponseStream())
258+
{
259+
_ = GetResponseAsync();
260+
}
261+
255262
// Wait until the commited call is finished and then clean up retry call.
256263
// Force yield here to prevent continuation running with any locks.
257-
await CompatibilityHelpers.AwaitWithYieldAsync(call.CallTask).ConfigureAwait(false);
258-
Cleanup();
264+
var status = await CompatibilityHelpers.AwaitWithYieldAsync(call.CallTask).ConfigureAwait(false);
265+
266+
var observeExceptions = status.StatusCode is StatusCode.Cancelled or StatusCode.DeadlineExceeded;
267+
Cleanup(observeExceptions);
259268
}
260269
}
261270

262271
Log.StoppingRetryWorker(Logger);
263272
}
264273
}
265274

266-
private static bool IsSuccessfulStreamingCall(Status responseStatus, GrpcCall<TRequest, TResponse> call)
275+
private bool IsSuccessfulStreamingCall(Status responseStatus)
267276
{
268277
if (responseStatus.StatusCode != StatusCode.OK)
269278
{
270279
return false;
271280
}
272281

273-
return call.Method.Type == MethodType.ServerStreaming || call.Method.Type == MethodType.DuplexStreaming;
282+
return HasResponseStream();
274283
}
275284

276285
protected override void OnCommitCall(IGrpcCall<TRequest, TResponse> call)

src/Grpc.Net.Client/Internal/Retry/RetryCallBase.cs

+39-5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ internal abstract partial class RetryCallBase<TRequest, TResponse> : IGrpcCall<T
3333
private readonly TaskCompletionSource<IGrpcCall<TRequest, TResponse>> _commitedCallTcs;
3434
private RetryCallBaseClientStreamReader<TRequest, TResponse>? _retryBaseClientStreamReader;
3535
private RetryCallBaseClientStreamWriter<TRequest, TResponse>? _retryBaseClientStreamWriter;
36+
private Task<TResponse>? _responseTask;
37+
private Task<Metadata>? _responseHeadersTask;
3638

3739
// Internal for unit testing.
3840
internal CancellationTokenRegistration? _ctsRegistration;
@@ -111,13 +113,34 @@ protected RetryCallBase(GrpcChannel channel, Method<TRequest, TResponse> method,
111113
}
112114
}
113115

114-
public async Task<TResponse> GetResponseAsync()
116+
public Task<TResponse> GetResponseAsync() => _responseTask ??= GetResponseCoreAsync();
117+
118+
private async Task<TResponse> GetResponseCoreAsync()
115119
{
116120
var call = await CommitedCallTask.ConfigureAwait(false);
117121
return await call.GetResponseAsync().ConfigureAwait(false);
118122
}
119123

120-
public async Task<Metadata> GetResponseHeadersAsync()
124+
public Task<Metadata> GetResponseHeadersAsync()
125+
{
126+
if (_responseHeadersTask == null)
127+
{
128+
_responseHeadersTask = GetResponseHeadersCoreAsync();
129+
130+
// ResponseHeadersAsync could be called inside a client interceptor when a call is wrapped.
131+
// Most people won't use the headers result. Observed exception to avoid unobserved exception event.
132+
_responseHeadersTask.ObserveException();
133+
134+
// If there was an error fetching response headers then it's likely the same error is reported
135+
// by response TCS. The user is unlikely to observe both errors.
136+
// Observed exception to avoid unobserved exception event.
137+
_responseTask?.ObserveException();
138+
}
139+
140+
return _responseHeadersTask;
141+
}
142+
143+
private async Task<Metadata> GetResponseHeadersCoreAsync()
121144
{
122145
var call = await CommitedCallTask.ConfigureAwait(false);
123146
return await call.GetResponseHeadersAsync().ConfigureAwait(false);
@@ -369,7 +392,7 @@ protected void CommitCall(IGrpcCall<TRequest, TResponse> call, CommitReason comm
369392
// A commited call that has already cleaned up is likely a StatusGrpcCall.
370393
if (call.Disposed)
371394
{
372-
Cleanup();
395+
Cleanup(observeExceptions: false);
373396
}
374397
}
375398
}
@@ -382,6 +405,11 @@ protected bool HasClientStream()
382405
return Method.Type == MethodType.ClientStreaming || Method.Type == MethodType.DuplexStreaming;
383406
}
384407

408+
protected bool HasResponseStream()
409+
{
410+
return Method.Type == MethodType.ServerStreaming || Method.Type == MethodType.DuplexStreaming;
411+
}
412+
385413
protected void SetNewActiveCallUnsynchronized(IGrpcCall<TRequest, TResponse> call)
386414
{
387415
Debug.Assert(Monitor.IsEntered(Lock), "Should be called with lock.");
@@ -436,11 +464,11 @@ protected virtual void Dispose(bool disposing)
436464
CommitedCallTask.Result.Dispose();
437465
}
438466

439-
Cleanup();
467+
Cleanup(observeExceptions: true);
440468
}
441469
}
442470

443-
protected void Cleanup()
471+
protected void Cleanup(bool observeExceptions)
444472
{
445473
Channel.FinishActiveCall(this);
446474

@@ -449,6 +477,12 @@ protected void Cleanup()
449477
CancellationTokenSource.Cancel();
450478

451479
ClearRetryBuffer();
480+
481+
if (observeExceptions)
482+
{
483+
_responseTask?.ObserveException();
484+
_responseHeadersTask?.ObserveException();
485+
}
452486
}
453487

454488
internal bool TryAddToRetryBuffer(ReadOnlyMemory<byte> message)

src/Grpc.Net.Client/Internal/TaskExtensions.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#region Copyright notice and license
1+
#region Copyright notice and license
22

33
// Copyright 2019 The gRPC Authors
44
//

test/Grpc.Net.Client.Tests/AsyncClientStreamingCallTests.cs

+12-12
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public async Task AsyncClientStreamingCall_Success_HttpRequestMessagePopulated()
5353
var invoker = HttpClientCallInvokerFactory.Create(httpClient);
5454

5555
// Act
56-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
56+
var call = invoker.AsyncClientStreamingCall();
5757

5858
await call.RequestStream.CompleteAsync().DefaultTimeout();
5959

@@ -98,7 +98,7 @@ public async Task AsyncClientStreamingCall_Success_RequestContentSent()
9898
var invoker = HttpClientCallInvokerFactory.Create(handler, "http://localhost");
9999

100100
// Act
101-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
101+
var call = invoker.AsyncClientStreamingCall();
102102
var requestContentTask = await requestContentTcs.Task.DefaultTimeout();
103103

104104
// Assert
@@ -149,7 +149,7 @@ public async Task ClientStreamWriter_WriteWhilePendingWrite_ErrorThrown()
149149
var invoker = HttpClientCallInvokerFactory.Create(httpClient);
150150

151151
// Act
152-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
152+
var call = invoker.AsyncClientStreamingCall();
153153

154154
// Assert
155155
var writeTask1 = call.RequestStream.WriteAsync(new HelloRequest { Name = "1" });
@@ -178,7 +178,7 @@ public async Task ClientStreamWriter_DisposeWhilePendingWrite_NoReadMessageError
178178
var invoker = HttpClientCallInvokerFactory.Create(httpClient, loggerFactory: loggerFactory);
179179

180180
// Act
181-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
181+
var call = invoker.AsyncClientStreamingCall();
182182

183183
// Assert
184184
var writeTask1 = call.RequestStream.WriteAsync(new HelloRequest { Name = "1" });
@@ -216,7 +216,7 @@ public async Task ClientStreamWriter_CompleteWhilePendingWrite_ErrorThrown()
216216
var invoker = HttpClientCallInvokerFactory.Create(httpClient);
217217

218218
// Act
219-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
219+
var call = invoker.AsyncClientStreamingCall();
220220

221221
// Assert
222222
var writeTask1 = call.RequestStream.WriteAsync(new HelloRequest { Name = "1" });
@@ -240,7 +240,7 @@ public async Task ClientStreamWriter_WriteWhileComplete_ErrorThrown()
240240
var invoker = HttpClientCallInvokerFactory.Create(httpClient);
241241

242242
// Act
243-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
243+
var call = invoker.AsyncClientStreamingCall();
244244
await call.RequestStream.CompleteAsync().DefaultTimeout();
245245
var resultTask = call.ResponseAsync;
246246

@@ -273,7 +273,7 @@ public async Task ClientStreamWriter_WriteWithInvalidHttpStatus_ErrorThrown()
273273
var invoker = HttpClientCallInvokerFactory.Create(httpClient);
274274

275275
// Act
276-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
276+
var call = invoker.AsyncClientStreamingCall();
277277
var writeException = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.RequestStream.WriteAsync(new HelloRequest { Name = "1" })).DefaultTimeout();
278278
var resultException = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.ResponseAsync).DefaultTimeout();
279279

@@ -299,7 +299,7 @@ public async Task ClientStreamWriter_WriteAfterResponseHasFinished_ErrorThrown()
299299
var invoker = HttpClientCallInvokerFactory.Create(httpClient);
300300

301301
// Act
302-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
302+
var call = invoker.AsyncClientStreamingCall();
303303
var ex = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.RequestStream.WriteAsync(new HelloRequest())).DefaultTimeout();
304304
var result = await call.ResponseAsync.DefaultTimeout();
305305

@@ -329,7 +329,7 @@ public async Task AsyncClientStreamingCall_ErrorWhileWriting_StatusExceptionThro
329329
// Act
330330

331331
// Client starts call
332-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
332+
var call = invoker.AsyncClientStreamingCall();
333333
// Client starts request stream write
334334
var writeTask = call.RequestStream.WriteAsync(new HelloRequest());
335335

@@ -422,7 +422,7 @@ public async Task ClientStreamWriter_CancelledBeforeCallStarts_ThrowsError()
422422
var invoker = HttpClientCallInvokerFactory.Create(httpClient);
423423

424424
// Act
425-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions(cancellationToken: new CancellationToken(true)));
425+
var call = invoker.AsyncClientStreamingCall(new CallOptions(cancellationToken: new CancellationToken(true)));
426426

427427
var ex = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.RequestStream.WriteAsync(new HelloRequest())).DefaultTimeout();
428428

@@ -442,7 +442,7 @@ public async Task ClientStreamWriter_CancelledBeforeCallStarts_ThrowOperationCan
442442
var invoker = HttpClientCallInvokerFactory.Create(httpClient, configure: o => o.ThrowOperationCanceledOnCancellation = true);
443443

444444
// Act
445-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions(cancellationToken: new CancellationToken(true)));
445+
var call = invoker.AsyncClientStreamingCall(new CallOptions(cancellationToken: new CancellationToken(true)));
446446

447447
await ExceptionAssert.ThrowsAsync<OperationCanceledException>(() => call.RequestStream.WriteAsync(new HelloRequest())).DefaultTimeout();
448448

@@ -461,7 +461,7 @@ public async Task ClientStreamWriter_CallThrowsException_WriteAsyncThrowsError()
461461
var invoker = HttpClientCallInvokerFactory.Create(httpClient);
462462

463463
// Act
464-
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
464+
var call = invoker.AsyncClientStreamingCall();
465465
var writeException = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.RequestStream.WriteAsync(new HelloRequest())).DefaultTimeout();
466466
var resultException = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.ResponseAsync).DefaultTimeout();
467467

test/Grpc.Net.Client.Tests/AsyncDuplexStreamingCallTests.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public async Task AsyncDuplexStreamingCall_NoContent_NoMessagesReturned()
5050
var invoker = HttpClientCallInvokerFactory.Create(httpClient);
5151

5252
// Act
53-
var call = invoker.AsyncDuplexStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
53+
var call = invoker.AsyncDuplexStreamingCall();
5454

5555
var responseStream = call.ResponseStream;
5656

@@ -76,7 +76,7 @@ public async Task AsyncServerStreamingCall_MessagesReturnedTogether_MessagesRece
7676
var invoker = HttpClientCallInvokerFactory.Create(httpClient);
7777

7878
// Act
79-
var call = invoker.AsyncDuplexStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
79+
var call = invoker.AsyncDuplexStreamingCall();
8080

8181
var responseStream = call.ResponseStream;
8282

@@ -109,7 +109,7 @@ public async Task AsyncDuplexStreamingCall_MessagesStreamed_MessagesReceived()
109109
var invoker = HttpClientCallInvokerFactory.Create(handler, "https://localhost");
110110

111111
// Act
112-
var call = invoker.AsyncDuplexStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
112+
var call = invoker.AsyncDuplexStreamingCall();
113113

114114
var requestStream = call.RequestStream;
115115
var responseStream = call.ResponseStream;
@@ -216,7 +216,7 @@ public async Task AsyncDuplexStreamingCall_CancellationDisposeRace_Success()
216216

217217
var cts = new CancellationTokenSource();
218218

219-
var call = invoker.AsyncDuplexStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions(cancellationToken: cts.Token));
219+
var call = invoker.AsyncDuplexStreamingCall(new CallOptions(cancellationToken: cts.Token));
220220
await call.RequestStream.WriteAsync(new HelloRequest { Name = "1" }).DefaultTimeout();
221221
await call.RequestStream.CompleteAsync().DefaultTimeout();
222222

0 commit comments

Comments
 (0)