Skip to content

Commit 36b265d

Browse files
authored
Avoid synchronous I/O in BufferedReadStream (#11379)
Fixes # ### Context ### Changes Made ### Testing ### Notes
2 parents e18e7f0 + d6bc14b commit 36b265d

File tree

2 files changed

+99
-25
lines changed

2 files changed

+99
-25
lines changed

src/Shared/BufferedReadStream.cs

+64-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33

44
using System;
55
using System.IO;
6+
using System.IO.Pipes;
7+
using System.Threading;
8+
9+
#if NET451_OR_GREATER || NETCOREAPP
10+
using System.Threading.Tasks;
11+
#endif
612

713
#nullable disable
814

@@ -11,14 +17,14 @@ namespace Microsoft.Build.BackEnd
1117
internal class BufferedReadStream : Stream
1218
{
1319
private const int BUFFER_SIZE = 1024;
14-
private Stream _innerStream;
20+
private NamedPipeServerStream _innerStream;
1521
private byte[] _buffer;
1622

1723
// The number of bytes in the buffer that have been read from the underlying stream but not read by consumers of this stream
1824
private int _currentlyBufferedByteCount;
1925
private int _currentIndexInBuffer;
2026

21-
public BufferedReadStream(Stream innerStream)
27+
public BufferedReadStream(NamedPipeServerStream innerStream)
2228
{
2329
_innerStream = innerStream;
2430
_buffer = new byte[BUFFER_SIZE];
@@ -120,6 +126,62 @@ public override int Read(byte[] buffer, int offset, int count)
120126
}
121127
}
122128

129+
#if NET451_OR_GREATER || NETCOREAPP
130+
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
131+
{
132+
if (count > BUFFER_SIZE)
133+
{
134+
// Trying to read more data than the buffer can hold
135+
int alreadyCopied = CopyToBuffer(buffer, offset);
136+
137+
#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
138+
int innerReadCount = await _innerStream.ReadAsync(buffer, offset + alreadyCopied, count - alreadyCopied, cancellationToken);
139+
#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
140+
return innerReadCount + alreadyCopied;
141+
}
142+
else if (count <= _currentlyBufferedByteCount)
143+
{
144+
// Enough data buffered to satisfy read request
145+
Array.Copy(_buffer, _currentIndexInBuffer, buffer, offset, count);
146+
_currentIndexInBuffer += count;
147+
_currentlyBufferedByteCount -= count;
148+
return count;
149+
}
150+
else
151+
{
152+
// Need to read more data
153+
int alreadyCopied = CopyToBuffer(buffer, offset);
154+
155+
#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
156+
int innerReadCount = await _innerStream.ReadAsync(_buffer, 0, BUFFER_SIZE, cancellationToken);
157+
#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
158+
_currentIndexInBuffer = 0;
159+
_currentlyBufferedByteCount = innerReadCount;
160+
161+
int remainingCopyCount = alreadyCopied + innerReadCount >= count ? count - alreadyCopied : innerReadCount;
162+
Array.Copy(_buffer, 0, buffer, offset + alreadyCopied, remainingCopyCount);
163+
_currentIndexInBuffer += remainingCopyCount;
164+
_currentlyBufferedByteCount -= remainingCopyCount;
165+
166+
return alreadyCopied + remainingCopyCount;
167+
}
168+
169+
int CopyToBuffer(byte[] buffer, int offset)
170+
{
171+
int alreadyCopied = 0;
172+
if (_currentlyBufferedByteCount > 0)
173+
{
174+
Array.Copy(_buffer, _currentIndexInBuffer, buffer, offset, _currentlyBufferedByteCount);
175+
alreadyCopied = _currentlyBufferedByteCount;
176+
_currentIndexInBuffer = 0;
177+
_currentlyBufferedByteCount = 0;
178+
}
179+
180+
return alreadyCopied;
181+
}
182+
}
183+
#endif
184+
123185
public override long Seek(long offset, SeekOrigin origin)
124186
{
125187
throw new NotSupportedException();

src/Shared/NodeEndpointOutOfProcBase.cs

+35-23
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
using Microsoft.Build.Shared;
1616
#if FEATURE_SECURITY_PERMISSIONS || FEATURE_PIPE_SECURITY
1717
using System.Security.AccessControl;
18-
1918
#endif
2019
#if FEATURE_PIPE_SECURITY && FEATURE_NAMED_PIPE_SECURITY_CONSTRUCTOR
2120
using System.Security.Principal;
2221
#endif
23-
#if !FEATURE_APM
22+
#if NET451_OR_GREATER || NETCOREAPP
2423
using System.Threading.Tasks;
2524
#endif
2625

@@ -511,7 +510,7 @@ private void PacketPumpProc()
511510
}
512511
}
513512

514-
private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
513+
private void RunReadLoop(BufferedReadStream localReadPipe, NamedPipeServerStream localWritePipe,
515514
ConcurrentQueue<INodePacket> localPacketQueue, AutoResetEvent localPacketAvailable, AutoResetEvent localTerminatePacketPump)
516515
{
517516
// Ordering of the wait handles is important. The first signalled wait handle in the array
@@ -520,25 +519,30 @@ private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
520519
// spammed to the endpoint and it never gets an opportunity to shutdown.
521520
CommunicationsUtilities.Trace("Entering read loop.");
522521
byte[] headerByte = new byte[5];
523-
#if FEATURE_APM
524-
IAsyncResult result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
525-
#else
522+
#if NET451_OR_GREATER
523+
Task<int> readTask = localReadPipe.ReadAsync(headerByte, 0, headerByte.Length, CancellationToken.None);
524+
#elif NETCOREAPP
526525
Task<int> readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length);
526+
#else
527+
IAsyncResult result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
527528
#endif
528529

529-
bool exitLoop = false;
530-
do
530+
// Ordering is important. We want packetAvailable to supercede terminate otherwise we will not properly wait for all
531+
// packets to be sent by other threads which are shutting down, such as the logging thread.
532+
WaitHandle[] handles = new WaitHandle[]
531533
{
532-
// Ordering is important. We want packetAvailable to supercede terminate otherwise we will not properly wait for all
533-
// packets to be sent by other threads which are shutting down, such as the logging thread.
534-
WaitHandle[] handles = new WaitHandle[] {
535-
#if FEATURE_APM
536-
result.AsyncWaitHandle,
534+
#if NET451_OR_GREATER || NETCOREAPP
535+
((IAsyncResult)readTask).AsyncWaitHandle,
537536
#else
538-
((IAsyncResult)readTask).AsyncWaitHandle,
537+
result.AsyncWaitHandle,
539538
#endif
540-
localPacketAvailable, localTerminatePacketPump };
539+
localPacketAvailable,
540+
localTerminatePacketPump,
541+
};
541542

543+
bool exitLoop = false;
544+
do
545+
{
542546
int waitId = WaitHandle.WaitAny(handles);
543547
switch (waitId)
544548
{
@@ -547,10 +551,10 @@ private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
547551
int bytesRead = 0;
548552
try
549553
{
550-
#if FEATURE_APM
551-
bytesRead = localReadPipe.EndRead(result);
552-
#else
554+
#if NET451_OR_GREATER || NETCOREAPP
553555
bytesRead = readTask.Result;
556+
#else
557+
bytesRead = localReadPipe.EndRead(result);
554558
#endif
555559
}
556560
catch (Exception e)
@@ -591,7 +595,7 @@ private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
591595
break;
592596
}
593597

594-
NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]);
598+
NodePacketType packetType = (NodePacketType)headerByte[0];
595599

596600
try
597601
{
@@ -607,10 +611,18 @@ private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
607611
break;
608612
}
609613

610-
#if FEATURE_APM
614+
#if NET451_OR_GREATER
615+
readTask = localReadPipe.ReadAsync(headerByte, 0, headerByte.Length, CancellationToken.None);
616+
#elif NETCOREAPP
617+
readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length);
618+
#else
611619
result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
620+
#endif
621+
622+
#if NET451_OR_GREATER || NETCOREAPP
623+
handles[0] = ((IAsyncResult)readTask).AsyncWaitHandle;
612624
#else
613-
readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length);
625+
handles[0] = result.AsyncWaitHandle;
614626
#endif
615627
}
616628

@@ -673,8 +685,8 @@ private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
673685
while (!exitLoop);
674686
}
675687

676-
#endregion
688+
#endregion
677689

678-
#endregion
690+
#endregion
679691
}
680692
}

0 commit comments

Comments
 (0)