Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take steps to avoid threadpool starvation #11275

Merged
merged 5 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Build/BackEnd/Components/Caching/ResultsCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private static bool CheckResults(BuildResult result, List<string> targets, bool
bool missingTargetFound = false;
foreach (string target in targets)
{
if (!result.HasResultsForTarget(target) || (result[target].ResultCode == TargetResultCode.Skipped && !skippedResultsAreOK))
if (!result.TryGetResultsForTarget(target, out TargetResult targetResult) || (targetResult.ResultCode == TargetResultCode.Skipped && !skippedResultsAreOK))
{
if (checkTargetsMissingResults)
{
Expand All @@ -334,7 +334,7 @@ private static bool CheckResults(BuildResult result, List<string> targets, bool
{
// If the result was a failure and we have not seen any skipped targets up to this point, then we conclude we do
// have results for this request, and they indicate failure.
if (result[target].ResultCode == TargetResultCode.Failure && (!checkTargetsMissingResults || !missingTargetFound))
if (targetResult.ResultCode == TargetResultCode.Failure && (!checkTargetsMissingResults || !missingTargetFound))
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
using System.IO.Pipes;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
#if FEATURE_PIPE_SECURITY
using System.Security.Principal;
#endif

#if FEATURE_APM
using Microsoft.Build.Eventing;
#else
using System.Threading;
#endif
using Microsoft.Build.Internal;
using Microsoft.Build.Shared;
Expand Down Expand Up @@ -580,7 +579,7 @@ private enum ExitPacketState
/// <summary>
/// A queue used for enqueuing packets to write to the stream asynchronously.
/// </summary>
private BlockingCollection<INodePacket> _packetWriteQueue = new BlockingCollection<INodePacket>();
private ConcurrentQueue<INodePacket> _packetWriteQueue = new ConcurrentQueue<INodePacket>();

/// <summary>
/// A task representing the last packet write, so we can chain packet writes one after another.
Expand Down Expand Up @@ -715,7 +714,7 @@ public void SendData(INodePacket packet)
{
_exitPacketState = ExitPacketState.ExitPacketQueued;
}
_packetWriteQueue.Add(packet);
_packetWriteQueue.Enqueue(packet);
DrainPacketQueue();
}

Expand All @@ -737,65 +736,63 @@ private void DrainPacketQueue()
{
// average latency between the moment this runs and when the delegate starts
// running is about 100-200 microseconds (unless there's thread pool saturation)
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(_ =>
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(
SendDataCoreAsync,
this,
TaskScheduler.Default).Unwrap();

static async Task SendDataCoreAsync(Task _, object state)
{
while (_packetWriteQueue.TryTake(out var packet))
NodeContext context = (NodeContext)state;
while (context._packetWriteQueue.TryDequeue(out var packet))
{
SendDataCore(packet);
}
}, TaskScheduler.Default);
}
}
MemoryStream writeStream = context._writeBufferMemoryStream;

/// <summary>
/// Actually writes and sends the packet. This can't be called in parallel
/// because it reuses the _writeBufferMemoryStream, and this is why we use
/// the _packetWriteDrainTask to serially chain invocations one after another.
/// </summary>
/// <param name="packet">The packet to send.</param>
private void SendDataCore(INodePacket packet)
{
MemoryStream writeStream = _writeBufferMemoryStream;
// clear the buffer but keep the underlying capacity to avoid reallocations
writeStream.SetLength(0);

// clear the buffer but keep the underlying capacity to avoid reallocations
writeStream.SetLength(0);
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
try
{
writeStream.WriteByte((byte)packet.Type);

ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
try
{
writeStream.WriteByte((byte)packet.Type);
// Pad for the packet length
WriteInt32(writeStream, 0);
packet.Translate(writeTranslator);

// Pad for the packet length
WriteInt32(writeStream, 0);
packet.Translate(writeTranslator);
int writeStreamLength = (int)writeStream.Position;

int writeStreamLength = (int)writeStream.Position;
// Now plug in the real packet length
writeStream.Position = 1;
WriteInt32(writeStream, writeStreamLength - 5);

// Now plug in the real packet length
writeStream.Position = 1;
WriteInt32(writeStream, writeStreamLength - 5);
byte[] writeStreamBuffer = writeStream.GetBuffer();

byte[] writeStreamBuffer = writeStream.GetBuffer();
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
{
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
await context._serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite, CancellationToken.None);
#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
}

for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
{
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
}
if (IsExitPacket(packet))
{
_exitPacketState = ExitPacketState.ExitPacketSent;
if (IsExitPacket(packet))
{
context._exitPacketState = ExitPacketState.ExitPacketSent;
}
}
catch (IOException e)
{
// Do nothing here because any exception will be caught by the async read handler
CommunicationsUtilities.Trace(context._nodeId, "EXCEPTION in SendData: {0}", e);
}
catch (ObjectDisposedException) // This happens if a child dies unexpectedly
{
// Do nothing here because any exception will be caught by the async read handler
}
}
}
}
catch (IOException e)
{
// Do nothing here because any exception will be caught by the async read handler
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in SendData: {0}", e);
}
catch (ObjectDisposedException) // This happens if a child dies unexpectedly
{
// Do nothing here because any exception will be caught by the async read handler
}
}

private static bool IsExitPacket(INodePacket packet)
Expand All @@ -806,7 +803,7 @@ private static bool IsExitPacket(INodePacket packet)
/// <summary>
/// Avoid having a BinaryWriter just to write a 4-byte int
/// </summary>
private void WriteInt32(MemoryStream stream, int value)
private static void WriteInt32(MemoryStream stream, int value)
{
stream.WriteByte((byte)value);
stream.WriteByte((byte)(value >> 8));
Expand Down
10 changes: 3 additions & 7 deletions src/Build/BackEnd/Components/RequestBuilder/TargetBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,8 @@ await PushTargets(errorTargets, currentTargetEntry, currentTargetEntry.Lookup, t
/// <returns>True to skip the target, false otherwise.</returns>
private bool CheckSkipTarget(ref bool stopProcessingStack, TargetEntry currentTargetEntry)
{
if (_buildResult.HasResultsForTarget(currentTargetEntry.Name))
if (_buildResult.TryGetResultsForTarget(currentTargetEntry.Name, out TargetResult targetResult))
{
TargetResult targetResult = _buildResult[currentTargetEntry.Name] as TargetResult;
ErrorUtilities.VerifyThrowInternalNull(targetResult, "targetResult");

if (targetResult.ResultCode != TargetResultCode.Skipped)
Expand Down Expand Up @@ -676,12 +675,9 @@ private async Task<bool> PushTargets(IList<TargetSpecification> targets, TargetE
{
// Don't build any Before or After targets for which we already have results. Unlike other targets,
// we don't explicitly log a skipped-with-results message because it is not interesting.
if (_buildResult.HasResultsForTarget(targetSpecification.TargetName))
if (_buildResult.TryGetResultsForTarget(targetSpecification.TargetName, out TargetResult targetResult) && targetResult.ResultCode != TargetResultCode.Skipped)
{
if (_buildResult[targetSpecification.TargetName].ResultCode != TargetResultCode.Skipped)
{
continue;
}
continue;
}
}

Expand Down
7 changes: 2 additions & 5 deletions src/Build/BackEnd/Components/Scheduler/Scheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,11 +1019,8 @@ private bool IsTraversalRequest(BuildRequest request)
private void AssignUnscheduledRequestsWithConfigurationCountLevelling(List<ScheduleResponse> responses, HashSet<int> idleNodes)
{
// Assign requests but try to keep the same number of configurations on each node
List<int> nodesByConfigurationCountAscending = new List<int>(_availableNodes.Keys);
nodesByConfigurationCountAscending.Sort(delegate (int left, int right)
{
return Comparer<int>.Default.Compare(_schedulingData.GetConfigurationsCountByNode(left, true /* excludeTraversals */, _configCache), _schedulingData.GetConfigurationsCountByNode(right, true /* excludeTraversals */, _configCache));
});
// Use OrderBy to sort since it will cache the lookup in configCache which. This reduces the number of times we have to acquire the lock.
IEnumerable<int> nodesByConfigurationCountAscending = _availableNodes.Keys.OrderBy(x => _schedulingData.GetConfigurationsCountByNode(x, excludeTraversals: true, _configCache));

// Assign projects to nodes, preferring to assign work to nodes with the fewest configurations first.
foreach (int nodeId in nodesByConfigurationCountAscending)
Expand Down
11 changes: 11 additions & 0 deletions src/Build/BackEnd/Shared/BuildResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Microsoft.Build.Shared;
using Microsoft.Build.Shared.FileSystem;
using Microsoft.Build.Framework;
using System.Diagnostics.CodeAnalysis;

namespace Microsoft.Build.Execution
{
Expand Down Expand Up @@ -601,6 +602,16 @@ public bool HasResultsForTarget(string target)
return _resultsByTarget?.ContainsKey(target) ?? false;
}

public bool TryGetResultsForTarget(string target, [NotNullWhen(true)] out TargetResult? value)
{
if (_resultsByTarget is null)
{
value = default;
return false;
}

return _resultsByTarget.TryGetValue(target, out value);
}
#region INodePacket Members

/// <summary>
Expand Down
30 changes: 12 additions & 18 deletions src/Shared/CommunicationsUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -540,26 +540,20 @@ internal static int ReadIntForHandshake(this PipeStream stream, byte? byteToAcce
else
#endif
{
// Legacy approach with an early-abort for connection attempts from ancient MSBuild.exes
for (int i = 0; i < bytes.Length; i++)
int bytesRead = stream.Read(bytes, 0, bytes.Length);
if (bytesRead != bytes.Length)
{
int read = stream.ReadByte();

if (read == -1)
{
// We've unexpectly reached end of stream.
// We are now in a bad state, disconnect on our end
throw new IOException(String.Format(CultureInfo.InvariantCulture, "Unexpected end of stream while reading for handshake"));
}

bytes[i] = Convert.ToByte(read);
// We've unexpectly reached end of stream.
// We are now in a bad state, disconnect on our end
throw new IOException(String.Format(CultureInfo.InvariantCulture, "Unexpected end of stream while reading for handshake"));
}

if (i == 0 && byteToAccept != null && byteToAccept != bytes[0])
{
stream.WriteIntForHandshake(0x0F0F0F0F);
stream.WriteIntForHandshake(0x0F0F0F0F);
throw new InvalidOperationException(String.Format(CultureInfo.InvariantCulture, "Client: rejected old host. Received byte {0} instead of {1}.", bytes[0], byteToAccept));
}
// Legacy approach with an early-abort for connection attempts from ancient MSBuild.exes
if (byteToAccept != null && byteToAccept != bytes[0])
{
stream.WriteIntForHandshake(0x0F0F0F0F);
stream.WriteIntForHandshake(0x0F0F0F0F);
throw new InvalidOperationException(String.Format(CultureInfo.InvariantCulture, "Client: rejected old host. Received byte {0} instead of {1}.", bytes[0], byteToAccept));
}
}

Expand Down
10 changes: 4 additions & 6 deletions src/Shared/NodeEndpointOutOfProcBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public LinkStatus LinkStatus
public void Listen(INodePacketFactory factory)
{
ErrorUtilities.VerifyThrow(_status == LinkStatus.Inactive, "Link not inactive. Status is {0}", _status);
ErrorUtilities.VerifyThrowArgumentNull(factory);
ErrorUtilities.VerifyThrowArgumentNull(factory, nameof(factory));
_packetFactory = factory;

InitializeAsyncPacketThread();
Expand Down Expand Up @@ -314,7 +314,7 @@ private void InternalDisconnect()
/// <param name="packet">The packet to be transmitted.</param>
private void EnqueuePacket(INodePacket packet)
{
ErrorUtilities.VerifyThrowArgumentNull(packet);
ErrorUtilities.VerifyThrowArgumentNull(packet, nameof(packet));
ErrorUtilities.VerifyThrow(_packetQueue != null, "packetQueue is null");
ErrorUtilities.VerifyThrow(_packetAvailable != null, "packetAvailable is null");
_packetQueue.Enqueue(packet);
Expand Down Expand Up @@ -531,15 +531,13 @@ private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
{
// Ordering is important. We want packetAvailable to supercede terminate otherwise we will not properly wait for all
// packets to be sent by other threads which are shutting down, such as the logging thread.
WaitHandle[] handles =
[
WaitHandle[] handles = new WaitHandle[] {
#if FEATURE_APM
result.AsyncWaitHandle,
#else
((IAsyncResult)readTask).AsyncWaitHandle,
#endif
localPacketAvailable, localTerminatePacketPump
];
localPacketAvailable, localTerminatePacketPump };

int waitId = WaitHandle.WaitAny(handles);
switch (waitId)
Expand Down