Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the ability to disable buffering #509

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
Expand Down Expand Up @@ -46,17 +47,25 @@ public HttpResponseAdapterFeature(IHttpResponseBodyFeature httpResponseBody)

Task IHttpResponseBodyFeature.CompleteAsync() => CompleteAsync();

void IHttpResponseBodyFeature.DisableBuffering()
public void DisableBuffering()
{
if (_state == StreamState.NotStarted)
_responseBodyFeature.DisableBuffering();
_state = StreamState.NotBuffering;

// If anything is already buffered, we'll use a custom pipe that will
// clear out the buffer the next time flush is called since this method
// is not async
if (_bufferedStream is { })
{
_pipeWriter = new FlushingBufferedPipeWriter(this, _responseBodyFeature.Writer);
}
else
{
_state = StreamState.NotBuffering;
_responseBodyFeature.DisableBuffering();
_pipeWriter = _responseBodyFeature.Writer;
}
}

void IHttpResponseBufferingFeature.EnableBuffering(int memoryThreshold, long? bufferLimit)
void IHttpResponseBufferingFeature.EnableBuffering(int? memoryThreshold, long? bufferLimit)
{
if (_state == StreamState.Buffering)
{
Expand All @@ -67,7 +76,7 @@ void IHttpResponseBufferingFeature.EnableBuffering(int memoryThreshold, long? bu
Debug.Assert(_bufferedStream is null);

_state = StreamState.Buffering;
_factory = () => new FileBufferingWriteStream(memoryThreshold, bufferLimit);
_factory = () => new FileBufferingWriteStream(memoryThreshold ?? PreBufferRequestStreamAttribute.DefaultBufferThreshold, bufferLimit);
}
else
{
Expand Down Expand Up @@ -100,22 +109,35 @@ private async ValueTask FlushInternalAsync()
await _pipeWriter.FlushAsync();
}

if (_state is StreamState.Buffering && _bufferedStream is not null && !SuppressContent)
if (_state is StreamState.Buffering)
{
await DrainStreamAsync(default);
}
}

private async ValueTask DrainStreamAsync(CancellationToken token)
{
if (_bufferedStream is null)
{
return;
}

if (!SuppressContent)
{
if (_filter is { } filter)
{
await _bufferedStream.DrainBufferAsync(filter);
await _bufferedStream.DrainBufferAsync(filter, token);
await filter.DisposeAsync();
_filter = null;
}
else
{
await _bufferedStream.DrainBufferAsync(_responseBodyFeature.Stream);
await _bufferedStream.DrainBufferAsync(_responseBodyFeature.Stream, token);
}

await _bufferedStream.DisposeAsync();
_bufferedStream = null;
}

await _bufferedStream.DisposeAsync();
_bufferedStream = null;
}

Stream IHttpResponseBodyFeature.Stream => this;
Expand Down Expand Up @@ -286,4 +308,92 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> CurrentStream.WriteAsync(buffer, offset, count, cancellationToken);

/// <summary>
/// A <see cref="PipeWriter"/> that can flush any existing buffered items before writing next sequence of bytes
/// Intended to be used if <see cref="IHttpResponseBodyFeature.DisableBuffering"/> is called and data has been buffered
/// to ensure that the final output will be ordered correctly (since we can't asynchronously write the data in that call).
/// </summary>
/// <remarks>
/// Calls to <see cref="Advance(int)"/>, <see cref="GetSpan(int)"/>, <see cref="GetMemory(int)"/> must be called
/// in a group without calling <see cref="FlushAsync(CancellationToken)"/>. If not, then the call to <see cref="Advance(int)"/>
/// will potentially advance the inner pipe rather than the buffer.
/// </remarks>
private sealed class FlushingBufferedPipeWriter : PipeWriter
{
private readonly PipeWriter _other;

private HttpResponseAdapterFeature? _feature;
private ArrayBufferWriter<byte>? _buffer;

public FlushingBufferedPipeWriter(HttpResponseAdapterFeature feature, PipeWriter other)
{
_feature = feature;
_other = other;
}

public override void CancelPendingFlush() => _other.CancelPendingFlush();

public override void Complete(Exception? exception = null) => _other.Complete(exception);

public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
await FlushExistingDataAsync(cancellationToken);

return await _other.FlushAsync(cancellationToken);
}

private async ValueTask FlushExistingDataAsync(CancellationToken cancellationToken)
{
if (_feature is { })
{
await _feature.DrainStreamAsync(cancellationToken);
_feature = null;
}

if (_buffer is { })
{
await _other.WriteAsync(_buffer.WrittenMemory, cancellationToken);
_buffer = null;
}
}

public bool IsBuffered => _feature is { };

public override void Advance(int bytes)
{
if (_buffer is { })
{
_buffer.Advance(bytes);
}
else
{
_other.Advance(bytes);
}
}

public override Memory<byte> GetMemory(int sizeHint = 0)
{
if (IsBuffered)
{
return (_buffer ??= new()).GetMemory(sizeHint);
}
else
{
return _other.GetMemory(sizeHint);
}
}

public override Span<byte> GetSpan(int sizeHint = 0)
{
if (IsBuffered)
{
return (_buffer ??= new()).GetSpan(sizeHint);
}
else
{
return _other.GetSpan(sizeHint);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ namespace Microsoft.AspNetCore.SystemWebAdapters.Features;
[Experimental(Constants.ExperimentalFeatures.DiagnosticId)]
public interface IHttpResponseBufferingFeature
{
void EnableBuffering(int memoryThreshold, long? bufferLimit);
void EnableBuffering(int? memoryThreshold = default, long? bufferLimit = default);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need the defaults and nullable changes here? Trying to see if we can avoid being binary breaking here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes it easier to just enable buffering when we want the default and the default value is currently in a different layer than I wanted to call it from.

This interface is intentionally marked as "Experimental" so we can break it as much as we need


ValueTask FlushAsync();

[AllowNull]
Stream Filter { get; set; }

void DisableBuffering();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this member to the interface is of course also a breaking change. Do we expect this to cause any issues?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: this interface is intentionally marked as "Experimental" so we can break it as much as we need


bool IsEnabled { get; }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ public partial class HttpRequestWrapper : System.Web.HttpRequestBase
public partial class HttpResponse
{
internal HttpResponse() { }
public bool BufferOutput { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public bool BufferOutput { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} set { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public System.Web.HttpCachePolicy Cache { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public string Charset { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} set { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public System.Text.Encoding ContentEncoding { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} set { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
Expand Down Expand Up @@ -555,7 +555,7 @@ internal HttpResponse() { }
public partial class HttpResponseBase
{
public HttpResponseBase() { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");}
public virtual bool BufferOutput { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public virtual bool BufferOutput { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} set { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public virtual System.Web.HttpCachePolicyBase Cache { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public virtual string Charset { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} set { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public virtual System.Text.Encoding ContentEncoding { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} set { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
Expand Down Expand Up @@ -599,7 +599,7 @@ public partial class HttpResponseBase
public partial class HttpResponseWrapper : System.Web.HttpResponseBase
{
public HttpResponseWrapper(System.Web.HttpResponse response) { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");}
public override bool BufferOutput { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public override bool BufferOutput { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} set { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public override System.Web.HttpCachePolicyBase Cache { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public override string Charset { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} set { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
public override System.Text.Encoding ContentEncoding { get { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} set { throw new System.PlatformNotSupportedException("Only supported when running on ASP.NET Core or System.Web");} }
Expand Down
18 changes: 17 additions & 1 deletion src/Microsoft.AspNetCore.SystemWebAdapters/HttpResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,23 @@ public bool TrySkipIisCustomErrors
set => Response.HttpContext.Features.GetRequired<IStatusCodePagesFeature>().Enabled = value;
}

public bool BufferOutput => Response.HttpContext.Features.GetRequired<IHttpResponseBufferingFeature>().IsEnabled;
public bool BufferOutput
{
get => Response.HttpContext.Features.GetRequired<IHttpResponseBufferingFeature>().IsEnabled;
set
{
var feature = Response.HttpContext.Features.GetRequired<IHttpResponseBufferingFeature>();

if (value)
{
feature.EnableBuffering();
}
else
{
feature.DisableBuffering();
}
}
}

public Stream OutputStream => Response.Body;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public virtual Encoding ContentEncoding
set => throw new NotImplementedException();
}

public virtual bool BufferOutput => throw new NotImplementedException();
public virtual bool BufferOutput
{
get => throw new NotImplementedException();
set => throw new NotImplementedException();
}

public virtual Stream OutputStream => throw new NotImplementedException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ public override TextWriter Output
set => _response.Output = value;
}

public override bool BufferOutput => _response.BufferOutput;
public override bool BufferOutput
{
get => _response.BufferOutput;
set => _response.BufferOutput = value;
}

public override Stream OutputStream => _response.OutputStream;

Expand Down
Loading