@@ -36,6 +36,23 @@ internal enum SnapshottedStateFlags : byte
36
36
AttentionReceived = 1 << 5 // NOTE: Received is not volatile as it is only ever accessed\modified by TryRun its callees (i.e. single threaded access)
37
37
}
38
38
39
+ private sealed class TimeoutState
40
+ {
41
+ public const int Stopped = 0 ;
42
+ public const int Running = 1 ;
43
+ public const int ExpiredAsync = 2 ;
44
+ public const int ExpiredSync = 3 ;
45
+
46
+ private readonly int _value ;
47
+
48
+ public TimeoutState ( int value )
49
+ {
50
+ _value = value ;
51
+ }
52
+
53
+ public int IdentityValue => _value ;
54
+ }
55
+
39
56
private const int AttentionTimeoutSeconds = 5 ;
40
57
41
58
private static readonly ContextCallback s_readAdyncCallbackComplete = ReadAsyncCallbackComplete ;
@@ -113,9 +130,17 @@ internal enum SnapshottedStateFlags : byte
113
130
// Timeout variables
114
131
private long _timeoutMilliseconds ;
115
132
private long _timeoutTime ; // variable used for timeout computations, holds the value of the hi-res performance counter at which this request should expire
133
+ private int _timeoutState ; // expected to be one of the constant values TimeoutStopped, TimeoutRunning, TimeoutExpiredAsync, TimeoutExpiredSync
134
+ private int _timeoutIdentitySource ;
135
+ private volatile int _timeoutIdentityValue ;
116
136
internal volatile bool _attentionSent ; // true if we sent an Attention to the server
117
137
internal volatile bool _attentionSending ;
118
- internal bool _internalTimeout ; // an internal timeout occurred
138
+
139
+ // Below 2 properties are used to enforce timeout delays in code to
140
+ // reproduce issues related to theadpool starvation and timeout delay.
141
+ // It should always be set to false by default, and only be enabled during testing.
142
+ internal bool _enforceTimeoutDelay = false ;
143
+ internal int _enforcedTimeoutDelayInMilliSeconds = 5000 ;
119
144
120
145
private readonly LastIOTimer _lastSuccessfulIOTimer ;
121
146
@@ -760,7 +785,7 @@ private void ResetCancelAndProcessAttention()
760
785
// operations.
761
786
Parser . ProcessPendingAck ( this ) ;
762
787
}
763
- _internalTimeout = false ;
788
+ SetTimeoutStateStopped ( ) ;
764
789
}
765
790
}
766
791
@@ -1042,7 +1067,7 @@ internal bool TryProcessHeader()
1042
1067
return false ;
1043
1068
}
1044
1069
1045
- if ( _internalTimeout )
1070
+ if ( IsTimeoutStateExpired )
1046
1071
{
1047
1072
ThrowExceptionAndWarning ( ) ;
1048
1073
return true ;
@@ -1447,7 +1472,7 @@ internal bool TryReadInt16(out short value)
1447
1472
{
1448
1473
// The entire int16 is in the packet and in the buffer, so just return it
1449
1474
// and take care of the counters.
1450
- buffer = _inBuff . AsSpan ( _inBytesUsed , 2 ) ;
1475
+ buffer = _inBuff . AsSpan ( _inBytesUsed , 2 ) ;
1451
1476
_inBytesUsed += 2 ;
1452
1477
_inBytesPacket -= 2 ;
1453
1478
}
@@ -1481,7 +1506,7 @@ internal bool TryReadInt32(out int value)
1481
1506
}
1482
1507
1483
1508
AssertValidState ( ) ;
1484
- value = ( buffer [ 3 ] << 24 ) + ( buffer [ 2 ] << 16 ) + ( buffer [ 1 ] << 8 ) + buffer [ 0 ] ;
1509
+ value = ( buffer [ 3 ] << 24 ) + ( buffer [ 2 ] << 16 ) + ( buffer [ 1 ] << 8 ) + buffer [ 0 ] ;
1485
1510
return true ;
1486
1511
1487
1512
}
@@ -2247,11 +2272,62 @@ internal void OnConnectionClosed()
2247
2272
}
2248
2273
}
2249
2274
2250
- private void OnTimeout ( object state )
2275
+ public void SetTimeoutStateStopped ( )
2276
+ {
2277
+ Interlocked . Exchange ( ref _timeoutState , TimeoutState . Stopped ) ;
2278
+ _timeoutIdentityValue = 0 ;
2279
+ }
2280
+
2281
+ public bool IsTimeoutStateExpired
2282
+ {
2283
+ get
2284
+ {
2285
+ int state = _timeoutState ;
2286
+ return state == TimeoutState . ExpiredAsync || state == TimeoutState . ExpiredSync ;
2287
+ }
2288
+ }
2289
+
2290
+ private void OnTimeoutAsync ( object state )
2251
2291
{
2252
- if ( ! _internalTimeout )
2292
+ if ( _enforceTimeoutDelay )
2253
2293
{
2254
- _internalTimeout = true ;
2294
+ Thread . Sleep ( _enforcedTimeoutDelayInMilliSeconds ) ;
2295
+ }
2296
+
2297
+ int currentIdentityValue = _timeoutIdentityValue ;
2298
+ TimeoutState timeoutState = ( TimeoutState ) state ;
2299
+ if ( timeoutState . IdentityValue == _timeoutIdentityValue )
2300
+ {
2301
+ // the return value is not useful here because no choice is going to be made using it
2302
+ // we only want to make this call to set the state knowing that it will be seen later
2303
+ OnTimeoutCore ( TimeoutState . Running , TimeoutState . ExpiredAsync ) ;
2304
+ }
2305
+ else
2306
+ {
2307
+ Debug . WriteLine ( $ "OnTimeoutAsync called with identity state={ timeoutState . IdentityValue } but current identity is { currentIdentityValue } so it is being ignored") ;
2308
+ }
2309
+ }
2310
+
2311
+ private bool OnTimeoutSync ( )
2312
+ {
2313
+ return OnTimeoutCore ( TimeoutState . Running , TimeoutState . ExpiredSync ) ;
2314
+ }
2315
+
2316
+ /// <summary>
2317
+ /// attempts to change the timout state from the expected state to the target state and if it succeeds
2318
+ /// will setup the the stateobject into the timeout expired state
2319
+ /// </summary>
2320
+ /// <param name="expectedState">the state that is the expected current state, state will change only if this is correct</param>
2321
+ /// <param name="targetState">the state that will be changed to if the expected state is correct</param>
2322
+ /// <returns>boolean value indicating whether the call changed the timeout state</returns>
2323
+ private bool OnTimeoutCore ( int expectedState , int targetState )
2324
+ {
2325
+ Debug . Assert ( targetState == TimeoutState . ExpiredAsync || targetState == TimeoutState . ExpiredSync , "OnTimeoutCore must have an expiry state as the targetState" ) ;
2326
+
2327
+ bool retval = false ;
2328
+ if ( Interlocked . CompareExchange ( ref _timeoutState , targetState , expectedState ) == expectedState )
2329
+ {
2330
+ retval = true ;
2255
2331
// lock protects against Close and Cancel
2256
2332
lock ( this )
2257
2333
{
@@ -2349,6 +2425,7 @@ private void OnTimeout(object state)
2349
2425
}
2350
2426
}
2351
2427
}
2428
+ return retval ;
2352
2429
}
2353
2430
2354
2431
internal void ReadSni ( TaskCompletionSource < object > completion )
@@ -2383,19 +2460,32 @@ internal void ReadSni(TaskCompletionSource<object> completion)
2383
2460
{
2384
2461
Debug . Assert ( completion != null , "Async on but null asyncResult passed" ) ;
2385
2462
2386
- if ( _networkPacketTimeout == null )
2463
+ // if the state is currently stopped then change it to running and allocate a new identity value from
2464
+ // the identity source. The identity value is used to correlate timer callback events to the currently
2465
+ // running timeout and prevents a late timer callback affecting a result it does not relate to
2466
+ int previousTimeoutState = Interlocked . CompareExchange ( ref _timeoutState , TimeoutState . Running , TimeoutState . Stopped ) ;
2467
+ if ( previousTimeoutState == TimeoutState . Stopped )
2387
2468
{
2388
- _networkPacketTimeout = ADP . UnsafeCreateTimer (
2389
- new TimerCallback ( OnTimeout ) ,
2390
- null ,
2391
- Timeout . Infinite ,
2392
- Timeout . Infinite ) ;
2469
+ Debug . Assert ( _timeoutIdentityValue == 0 , "timer was previously stopped without resetting the _identityValue" ) ;
2470
+ _timeoutIdentityValue = Interlocked . Increment ( ref _timeoutIdentitySource ) ;
2393
2471
}
2394
2472
2473
+ _networkPacketTimeout ? . Dispose ( ) ;
2474
+
2475
+ _networkPacketTimeout = ADP . UnsafeCreateTimer (
2476
+ new TimerCallback ( OnTimeoutAsync ) ,
2477
+ new TimeoutState ( _timeoutIdentityValue ) ,
2478
+ Timeout . Infinite ,
2479
+ Timeout . Infinite
2480
+ ) ;
2481
+
2482
+
2395
2483
// -1 == Infinite
2396
2484
// 0 == Already timed out (NOTE: To simulate the same behavior as sync we will only timeout on 0 if we receive an IO Pending from SNI)
2397
2485
// >0 == Actual timeout remaining
2398
2486
int msecsRemaining = GetTimeoutRemaining ( ) ;
2487
+
2488
+ Debug . Assert ( previousTimeoutState == TimeoutState . Stopped , "previous timeout state was not Stopped" ) ;
2399
2489
if ( msecsRemaining > 0 )
2400
2490
{
2401
2491
ChangeNetworkPacketTimeout ( msecsRemaining , Timeout . Infinite ) ;
@@ -2445,12 +2535,15 @@ internal void ReadSni(TaskCompletionSource<object> completion)
2445
2535
_networkPacketTaskSource . TrySetResult ( null ) ;
2446
2536
}
2447
2537
// Disable timeout timer on error
2538
+ SetTimeoutStateStopped ( ) ;
2448
2539
ChangeNetworkPacketTimeout ( Timeout . Infinite , Timeout . Infinite ) ;
2449
2540
}
2450
2541
else if ( msecsRemaining == 0 )
2451
- { // Got IO Pending, but we have no time left to wait
2452
- // Immediately schedule the timeout timer to fire
2453
- ChangeNetworkPacketTimeout ( 0 , Timeout . Infinite ) ;
2542
+ {
2543
+ // Got IO Pending, but we have no time left to wait
2544
+ // disable the timer and set the error state by calling OnTimeoutSync
2545
+ ChangeNetworkPacketTimeout ( Timeout . Infinite , Timeout . Infinite ) ;
2546
+ OnTimeoutSync ( ) ;
2454
2547
}
2455
2548
// DO NOT HANDLE PENDING READ HERE - which is TdsEnums.SNI_SUCCESS_IO_PENDING state.
2456
2549
// That is handled by user who initiated async read, or by ReadNetworkPacket which is sync over async.
@@ -2565,13 +2658,13 @@ private void ReadSniError(TdsParserStateObject stateObj, uint error)
2565
2658
Debug . Assert ( _syncOverAsync , "Should never reach here with async on!" ) ;
2566
2659
bool fail = false ;
2567
2660
2568
- if ( _internalTimeout )
2661
+ if ( IsTimeoutStateExpired )
2569
2662
{ // This is now our second timeout - time to give up.
2570
2663
fail = true ;
2571
2664
}
2572
2665
else
2573
2666
{
2574
- stateObj . _internalTimeout = true ;
2667
+ stateObj . SetTimeoutStateStopped ( ) ;
2575
2668
Debug . Assert ( _parser . Connection != null , "SqlConnectionInternalTds handler can not be null at this point." ) ;
2576
2669
AddError ( new SqlError ( TdsEnums . TIMEOUT_EXPIRED , ( byte ) 0x00 , TdsEnums . MIN_ERROR_CLASS , _parser . Server , _parser . Connection . TimeoutErrorInternal . GetErrorMessage ( ) , "" , 0 , TdsEnums . SNI_WAIT_TIMEOUT ) ) ;
2577
2670
@@ -2794,6 +2887,25 @@ public void ReadAsyncCallback(IntPtr key, PacketHandle packet, uint error)
2794
2887
2795
2888
ChangeNetworkPacketTimeout ( Timeout . Infinite , Timeout . Infinite ) ;
2796
2889
2890
+ // The timer thread may be unreliable under high contention scenarios. It cannot be
2891
+ // assumed that the timeout has happened on the timer thread callback. Check the timeout
2892
+ // synchrnously and then call OnTimeoutSync to force an atomic change of state.
2893
+ if ( TimeoutHasExpired )
2894
+ {
2895
+ OnTimeoutSync ( ) ;
2896
+ }
2897
+
2898
+ // try to change to the stopped state but only do so if currently in the running state
2899
+ // and use cmpexch so that all changes out of the running state are atomic
2900
+ int previousState = Interlocked . CompareExchange ( ref _timeoutState , TimeoutState . Running , TimeoutState . Stopped ) ;
2901
+
2902
+ // if the state is anything other than running then this query has reached an end so
2903
+ // set the correlation _timeoutIdentityValue to 0 to prevent late callbacks executing
2904
+ if ( _timeoutState != TimeoutState . Running )
2905
+ {
2906
+ _timeoutIdentityValue = 0 ;
2907
+ }
2908
+
2797
2909
ProcessSniPacket ( packet , error ) ;
2798
2910
}
2799
2911
catch ( Exception e )
@@ -3454,7 +3566,6 @@ internal void SendAttention(bool mustTakeWriteLock = false)
3454
3566
// Set _attentionSending to true before sending attention and reset after setting _attentionSent
3455
3567
// This prevents a race condition between receiving the attention ACK and setting _attentionSent
3456
3568
_attentionSending = true;
3457
-
3458
3569
#if DEBUG
3459
3570
if ( ! _skipSendAttention )
3460
3571
{
@@ -3489,7 +3600,7 @@ internal void SendAttention(bool mustTakeWriteLock = false)
3489
3600
}
3490
3601
}
3491
3602
#if DEBUG
3492
- }
3603
+ }
3493
3604
#endif
3494
3605
3495
3606
SetTimeoutSeconds ( AttentionTimeoutSeconds ) ; // Initialize new attention timeout of 5 seconds.
@@ -3862,7 +3973,7 @@ internal void AssertStateIsClean()
3862
3973
// Attention\Cancellation\Timeouts
3863
3974
Debug. Assert ( ! HasReceivedAttention && ! _attentionSent && ! _attentionSending , $ "StateObj is still dealing with attention: Sent: { _attentionSent } , Received: { HasReceivedAttention } , Sending: { _attentionSending } ") ;
3864
3975
Debug. Assert ( ! _cancelled , "StateObj still has cancellation set" ) ;
3865
- Debug. Assert ( ! _internalTimeout , "StateObj still has internal timeout set" ) ;
3976
+ Debug. Assert ( _timeoutState == TimeoutState . Stopped , "StateObj still has internal timeout set" ) ;
3866
3977
// Errors and Warnings
3867
3978
Debug. Assert ( ! _hasErrorOrWarning , "StateObj still has stored errors or warnings" ) ;
3868
3979
}
0 commit comments