Skip to content

Commit de017c7

Browse files
authored
Take steps to avoid threadpool starvation (#11275)
Fixes # ### Context There are a handful of areas where threadpool threads are unnecessarily blocked by synchronous work, or there is additional contention that can be avoided. The most egregious instance is the synchronous read in BuferedReadStream ![image](https://github.com/user-attachments/assets/f8cdbb09-0182-4f22-a935-589daadb45ef) The outer call to BeginRead() ends up calling BufferedReadStream.Read() which synchronously blocks when _innerStream.Read() is called. This results in a substantial amount of block time for threadpool threads. ### Changes Made ### Testing ### Notes
2 parents 74c74a2 + 62c34ed commit de017c7

File tree

7 files changed

+84
-90
lines changed

7 files changed

+84
-90
lines changed

src/Build/BackEnd/Components/Caching/ResultsCache.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ private static bool CheckResults(BuildResult result, List<string> targets, bool
318318
bool missingTargetFound = false;
319319
foreach (string target in targets)
320320
{
321-
if (!result.HasResultsForTarget(target) || (result[target].ResultCode == TargetResultCode.Skipped && !skippedResultsAreOK))
321+
if (!result.TryGetResultsForTarget(target, out TargetResult targetResult) || (targetResult.ResultCode == TargetResultCode.Skipped && !skippedResultsAreOK))
322322
{
323323
if (checkTargetsMissingResults)
324324
{
@@ -334,7 +334,7 @@ private static bool CheckResults(BuildResult result, List<string> targets, bool
334334
{
335335
// If the result was a failure and we have not seen any skipped targets up to this point, then we conclude we do
336336
// have results for this request, and they indicate failure.
337-
if (result[target].ResultCode == TargetResultCode.Failure && (!checkTargetsMissingResults || !missingTargetFound))
337+
if (targetResult.ResultCode == TargetResultCode.Failure && (!checkTargetsMissingResults || !missingTargetFound))
338338
{
339339
return true;
340340
}

src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs

+49-52
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,14 @@
1010
using System.IO.Pipes;
1111
using System.Diagnostics;
1212
using System.Linq;
13+
using System.Threading;
1314
using System.Threading.Tasks;
1415
#if FEATURE_PIPE_SECURITY
1516
using System.Security.Principal;
1617
#endif
1718

1819
#if FEATURE_APM
1920
using Microsoft.Build.Eventing;
20-
#else
21-
using System.Threading;
2221
#endif
2322
using Microsoft.Build.Internal;
2423
using Microsoft.Build.Shared;
@@ -580,7 +579,7 @@ private enum ExitPacketState
580579
/// <summary>
581580
/// A queue used for enqueuing packets to write to the stream asynchronously.
582581
/// </summary>
583-
private BlockingCollection<INodePacket> _packetWriteQueue = new BlockingCollection<INodePacket>();
582+
private ConcurrentQueue<INodePacket> _packetWriteQueue = new ConcurrentQueue<INodePacket>();
584583

585584
/// <summary>
586585
/// A task representing the last packet write, so we can chain packet writes one after another.
@@ -715,7 +714,7 @@ public void SendData(INodePacket packet)
715714
{
716715
_exitPacketState = ExitPacketState.ExitPacketQueued;
717716
}
718-
_packetWriteQueue.Add(packet);
717+
_packetWriteQueue.Enqueue(packet);
719718
DrainPacketQueue();
720719
}
721720

@@ -737,65 +736,63 @@ private void DrainPacketQueue()
737736
{
738737
// average latency between the moment this runs and when the delegate starts
739738
// running is about 100-200 microseconds (unless there's thread pool saturation)
740-
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(_ =>
739+
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(
740+
SendDataCoreAsync,
741+
this,
742+
TaskScheduler.Default).Unwrap();
743+
744+
static async Task SendDataCoreAsync(Task _, object state)
741745
{
742-
while (_packetWriteQueue.TryTake(out var packet))
746+
NodeContext context = (NodeContext)state;
747+
while (context._packetWriteQueue.TryDequeue(out var packet))
743748
{
744-
SendDataCore(packet);
745-
}
746-
}, TaskScheduler.Default);
747-
}
748-
}
749+
MemoryStream writeStream = context._writeBufferMemoryStream;
749750

750-
/// <summary>
751-
/// Actually writes and sends the packet. This can't be called in parallel
752-
/// because it reuses the _writeBufferMemoryStream, and this is why we use
753-
/// the _packetWriteDrainTask to serially chain invocations one after another.
754-
/// </summary>
755-
/// <param name="packet">The packet to send.</param>
756-
private void SendDataCore(INodePacket packet)
757-
{
758-
MemoryStream writeStream = _writeBufferMemoryStream;
751+
// clear the buffer but keep the underlying capacity to avoid reallocations
752+
writeStream.SetLength(0);
759753

760-
// clear the buffer but keep the underlying capacity to avoid reallocations
761-
writeStream.SetLength(0);
754+
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
755+
try
756+
{
757+
writeStream.WriteByte((byte)packet.Type);
762758

763-
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
764-
try
765-
{
766-
writeStream.WriteByte((byte)packet.Type);
759+
// Pad for the packet length
760+
WriteInt32(writeStream, 0);
761+
packet.Translate(writeTranslator);
767762

768-
// Pad for the packet length
769-
WriteInt32(writeStream, 0);
770-
packet.Translate(writeTranslator);
763+
int writeStreamLength = (int)writeStream.Position;
771764

772-
int writeStreamLength = (int)writeStream.Position;
765+
// Now plug in the real packet length
766+
writeStream.Position = 1;
767+
WriteInt32(writeStream, writeStreamLength - 5);
773768

774-
// Now plug in the real packet length
775-
writeStream.Position = 1;
776-
WriteInt32(writeStream, writeStreamLength - 5);
769+
byte[] writeStreamBuffer = writeStream.GetBuffer();
777770

778-
byte[] writeStreamBuffer = writeStream.GetBuffer();
771+
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
772+
{
773+
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
774+
#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
775+
await context._serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite, CancellationToken.None);
776+
#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
777+
}
779778

780-
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
781-
{
782-
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
783-
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
784-
}
785-
if (IsExitPacket(packet))
786-
{
787-
_exitPacketState = ExitPacketState.ExitPacketSent;
779+
if (IsExitPacket(packet))
780+
{
781+
context._exitPacketState = ExitPacketState.ExitPacketSent;
782+
}
783+
}
784+
catch (IOException e)
785+
{
786+
// Do nothing here because any exception will be caught by the async read handler
787+
CommunicationsUtilities.Trace(context._nodeId, "EXCEPTION in SendData: {0}", e);
788+
}
789+
catch (ObjectDisposedException) // This happens if a child dies unexpectedly
790+
{
791+
// Do nothing here because any exception will be caught by the async read handler
792+
}
793+
}
788794
}
789795
}
790-
catch (IOException e)
791-
{
792-
// Do nothing here because any exception will be caught by the async read handler
793-
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in SendData: {0}", e);
794-
}
795-
catch (ObjectDisposedException) // This happens if a child dies unexpectedly
796-
{
797-
// Do nothing here because any exception will be caught by the async read handler
798-
}
799796
}
800797

801798
private static bool IsExitPacket(INodePacket packet)
@@ -806,7 +803,7 @@ private static bool IsExitPacket(INodePacket packet)
806803
/// <summary>
807804
/// Avoid having a BinaryWriter just to write a 4-byte int
808805
/// </summary>
809-
private void WriteInt32(MemoryStream stream, int value)
806+
private static void WriteInt32(MemoryStream stream, int value)
810807
{
811808
stream.WriteByte((byte)value);
812809
stream.WriteByte((byte)(value >> 8));

src/Build/BackEnd/Components/RequestBuilder/TargetBuilder.cs

+3-7
Original file line numberDiff line numberDiff line change
@@ -557,9 +557,8 @@ await PushTargets(errorTargets, currentTargetEntry, currentTargetEntry.Lookup, t
557557
/// <returns>True to skip the target, false otherwise.</returns>
558558
private bool CheckSkipTarget(ref bool stopProcessingStack, TargetEntry currentTargetEntry)
559559
{
560-
if (_buildResult.HasResultsForTarget(currentTargetEntry.Name))
560+
if (_buildResult.TryGetResultsForTarget(currentTargetEntry.Name, out TargetResult targetResult))
561561
{
562-
TargetResult targetResult = _buildResult[currentTargetEntry.Name] as TargetResult;
563562
ErrorUtilities.VerifyThrowInternalNull(targetResult, "targetResult");
564563

565564
if (targetResult.ResultCode != TargetResultCode.Skipped)
@@ -676,12 +675,9 @@ private async Task<bool> PushTargets(IList<TargetSpecification> targets, TargetE
676675
{
677676
// Don't build any Before or After targets for which we already have results. Unlike other targets,
678677
// we don't explicitly log a skipped-with-results message because it is not interesting.
679-
if (_buildResult.HasResultsForTarget(targetSpecification.TargetName))
678+
if (_buildResult.TryGetResultsForTarget(targetSpecification.TargetName, out TargetResult targetResult) && targetResult.ResultCode != TargetResultCode.Skipped)
680679
{
681-
if (_buildResult[targetSpecification.TargetName].ResultCode != TargetResultCode.Skipped)
682-
{
683-
continue;
684-
}
680+
continue;
685681
}
686682
}
687683

src/Build/BackEnd/Components/Scheduler/Scheduler.cs

+2-5
Original file line numberDiff line numberDiff line change
@@ -1019,11 +1019,8 @@ private bool IsTraversalRequest(BuildRequest request)
10191019
private void AssignUnscheduledRequestsWithConfigurationCountLevelling(List<ScheduleResponse> responses, HashSet<int> idleNodes)
10201020
{
10211021
// Assign requests but try to keep the same number of configurations on each node
1022-
List<int> nodesByConfigurationCountAscending = new List<int>(_availableNodes.Keys);
1023-
nodesByConfigurationCountAscending.Sort(delegate (int left, int right)
1024-
{
1025-
return Comparer<int>.Default.Compare(_schedulingData.GetConfigurationsCountByNode(left, true /* excludeTraversals */, _configCache), _schedulingData.GetConfigurationsCountByNode(right, true /* excludeTraversals */, _configCache));
1026-
});
1022+
// Use OrderBy to sort since it will cache the lookup in configCache which. This reduces the number of times we have to acquire the lock.
1023+
IEnumerable<int> nodesByConfigurationCountAscending = _availableNodes.Keys.OrderBy(x => _schedulingData.GetConfigurationsCountByNode(x, excludeTraversals: true, _configCache));
10271024

10281025
// Assign projects to nodes, preferring to assign work to nodes with the fewest configurations first.
10291026
foreach (int nodeId in nodesByConfigurationCountAscending)

src/Build/BackEnd/Shared/BuildResult.cs

+11
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using Microsoft.Build.Shared;
1111
using Microsoft.Build.Shared.FileSystem;
1212
using Microsoft.Build.Framework;
13+
using System.Diagnostics.CodeAnalysis;
1314

1415
namespace Microsoft.Build.Execution
1516
{
@@ -601,6 +602,16 @@ public bool HasResultsForTarget(string target)
601602
return _resultsByTarget?.ContainsKey(target) ?? false;
602603
}
603604

605+
public bool TryGetResultsForTarget(string target, [NotNullWhen(true)] out TargetResult? value)
606+
{
607+
if (_resultsByTarget is null)
608+
{
609+
value = default;
610+
return false;
611+
}
612+
613+
return _resultsByTarget.TryGetValue(target, out value);
614+
}
604615
#region INodePacket Members
605616

606617
/// <summary>

src/Shared/CommunicationsUtilities.cs

+13-18
Original file line numberDiff line numberDiff line change
@@ -540,26 +540,21 @@ internal static int ReadIntForHandshake(this PipeStream stream, byte? byteToAcce
540540
else
541541
#endif
542542
{
543-
// Legacy approach with an early-abort for connection attempts from ancient MSBuild.exes
544-
for (int i = 0; i < bytes.Length; i++)
545-
{
546-
int read = stream.ReadByte();
543+
int bytesRead = stream.Read(bytes, 0, bytes.Length);
547544

548-
if (read == -1)
549-
{
550-
// We've unexpectly reached end of stream.
551-
// We are now in a bad state, disconnect on our end
552-
throw new IOException(String.Format(CultureInfo.InvariantCulture, "Unexpected end of stream while reading for handshake"));
553-
}
554-
555-
bytes[i] = Convert.ToByte(read);
545+
// Abort for connection attempts from ancient MSBuild.exes
546+
if (byteToAccept != null && bytesRead > 0 && byteToAccept != bytes[0])
547+
{
548+
stream.WriteIntForHandshake(0x0F0F0F0F);
549+
stream.WriteIntForHandshake(0x0F0F0F0F);
550+
throw new InvalidOperationException(String.Format(CultureInfo.InvariantCulture, "Client: rejected old host. Received byte {0} instead of {1}.", bytes[0], byteToAccept));
551+
}
556552

557-
if (i == 0 && byteToAccept != null && byteToAccept != bytes[0])
558-
{
559-
stream.WriteIntForHandshake(0x0F0F0F0F);
560-
stream.WriteIntForHandshake(0x0F0F0F0F);
561-
throw new InvalidOperationException(String.Format(CultureInfo.InvariantCulture, "Client: rejected old host. Received byte {0} instead of {1}.", bytes[0], byteToAccept));
562-
}
553+
if (bytesRead != bytes.Length)
554+
{
555+
// We've unexpectly reached end of stream.
556+
// We are now in a bad state, disconnect on our end
557+
throw new IOException(String.Format(CultureInfo.InvariantCulture, "Unexpected end of stream while reading for handshake"));
563558
}
564559
}
565560

src/Shared/NodeEndpointOutOfProcBase.cs

+4-6
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public LinkStatus LinkStatus
152152
public void Listen(INodePacketFactory factory)
153153
{
154154
ErrorUtilities.VerifyThrow(_status == LinkStatus.Inactive, "Link not inactive. Status is {0}", _status);
155-
ErrorUtilities.VerifyThrowArgumentNull(factory);
155+
ErrorUtilities.VerifyThrowArgumentNull(factory, nameof(factory));
156156
_packetFactory = factory;
157157

158158
InitializeAsyncPacketThread();
@@ -314,7 +314,7 @@ private void InternalDisconnect()
314314
/// <param name="packet">The packet to be transmitted.</param>
315315
private void EnqueuePacket(INodePacket packet)
316316
{
317-
ErrorUtilities.VerifyThrowArgumentNull(packet);
317+
ErrorUtilities.VerifyThrowArgumentNull(packet, nameof(packet));
318318
ErrorUtilities.VerifyThrow(_packetQueue != null, "packetQueue is null");
319319
ErrorUtilities.VerifyThrow(_packetAvailable != null, "packetAvailable is null");
320320
_packetQueue.Enqueue(packet);
@@ -531,15 +531,13 @@ private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
531531
{
532532
// Ordering is important. We want packetAvailable to supercede terminate otherwise we will not properly wait for all
533533
// packets to be sent by other threads which are shutting down, such as the logging thread.
534-
WaitHandle[] handles =
535-
[
534+
WaitHandle[] handles = new WaitHandle[] {
536535
#if FEATURE_APM
537536
result.AsyncWaitHandle,
538537
#else
539538
((IAsyncResult)readTask).AsyncWaitHandle,
540539
#endif
541-
localPacketAvailable, localTerminatePacketPump
542-
];
540+
localPacketAvailable, localTerminatePacketPump };
543541

544542
int waitId = WaitHandle.WaitAny(handles);
545543
switch (waitId)

0 commit comments

Comments
 (0)