Skip to content

Commit 611c108

Browse files
authored
feat(s2n-quic-dc): add channel recv buffer impl (#2506)
1 parent 2916476 commit 611c108

File tree

13 files changed

+244
-41
lines changed

13 files changed

+244
-41
lines changed

dc/s2n-quic-dc/src/stream.rs

+6
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,9 @@ impl TransportFeatures {
7070
is_feature!(is_stream, STREAM);
7171
is_feature!(is_connected, CONNECTED);
7272
}
73+
74+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
75+
pub enum Actor {
76+
Application,
77+
Worker,
78+
}

dc/s2n-quic-dc/src/stream/client/tokio.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -161,5 +161,6 @@ where
161161
#[inline]
162162
fn recv_buffer() -> recv::shared::RecvBuffer {
163163
// TODO replace this with a parameter once everything is in place
164-
recv::buffer::Local::new(msg::recv::Message::new(9000), None)
164+
let recv_buffer = recv::buffer::Local::new(msg::recv::Message::new(9000), None);
165+
recv::buffer::Either::A(recv_buffer)
165166
}

dc/s2n-quic-dc/src/stream/recv/application.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
clock::Timer,
66
event::{self, ConnectionPublisher as _},
77
msg,
8-
stream::{recv, runtime, shared::ArcShared, socket},
8+
stream::{recv, runtime, shared::ArcShared, socket, Actor},
99
};
1010
use core::{
1111
fmt,
@@ -265,6 +265,7 @@ where
265265

266266
let recv = reader.poll_fill_recv_buffer(
267267
cx,
268+
Actor::Application,
268269
self.sockets.read_application(),
269270
&self.shared.clock,
270271
&self.shared.subscriber,

dc/s2n-quic-dc/src/stream/recv/buffer.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33

44
use crate::{
55
event,
6-
stream::{recv, socket::Socket, TransportFeatures},
6+
stream::{recv, socket::Socket, Actor, TransportFeatures},
77
};
88
use core::task::{Context, Poll};
99
use std::io;
1010

11+
pub mod channel;
1112
mod dispatch;
1213
mod local;
1314

15+
pub use channel::Channel;
1416
pub use dispatch::Dispatch;
1517
pub use local::Local;
1618

@@ -20,6 +22,7 @@ pub trait Buffer {
2022
fn poll_fill<S, Pub>(
2123
&mut self,
2224
cx: &mut Context,
25+
actor: Actor,
2326
socket: &S,
2427
publisher: &mut Pub,
2528
) -> Poll<io::Result<usize>>
@@ -36,7 +39,7 @@ pub trait Buffer {
3639
R: Dispatch;
3740
}
3841

39-
#[allow(dead_code)] // TODO remove this once we start using the channel buffer
42+
#[derive(Debug)]
4043
pub enum Either<A, B> {
4144
A(A),
4245
B(B),
@@ -59,6 +62,7 @@ where
5962
fn poll_fill<S, Pub>(
6063
&mut self,
6164
cx: &mut Context,
65+
actor: Actor,
6266
socket: &S,
6367
publisher: &mut Pub,
6468
) -> Poll<io::Result<usize>>
@@ -67,8 +71,8 @@ where
6771
Pub: event::ConnectionPublisher,
6872
{
6973
match self {
70-
Self::A(a) => a.poll_fill(cx, socket, publisher),
71-
Self::B(b) => b.poll_fill(cx, socket, publisher),
74+
Self::A(a) => a.poll_fill(cx, actor, socket, publisher),
75+
Self::B(b) => b.poll_fill(cx, actor, socket, publisher),
7276
}
7377
}
7478

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use super::Dispatch;
5+
use crate::{
6+
event,
7+
socket::recv::descriptor::Filled,
8+
stream::{
9+
recv::{
10+
self,
11+
dispatch::{Control, Stream},
12+
},
13+
socket::Socket,
14+
Actor, TransportFeatures,
15+
},
16+
};
17+
use core::task::{Context, Poll};
18+
use s2n_quic_core::ensure;
19+
use std::{collections::VecDeque, io};
20+
21+
#[derive(Debug)]
22+
pub struct Channel<Recv = Stream> {
23+
pending: VecDeque<Filled>,
24+
receiver: Recv,
25+
}
26+
27+
impl<Recv> Channel<Recv> {
28+
#[inline]
29+
pub fn new(receiver: Recv) -> Self {
30+
Self {
31+
pending: VecDeque::new(),
32+
receiver,
33+
}
34+
}
35+
}
36+
37+
macro_rules! impl_buffer {
38+
($recv:ident) => {
39+
impl super::Buffer for Channel<$recv> {
40+
#[inline]
41+
fn is_empty(&self) -> bool {
42+
self.pending.is_empty()
43+
}
44+
45+
#[inline]
46+
fn poll_fill<S, Pub>(
47+
&mut self,
48+
cx: &mut Context,
49+
actor: Actor,
50+
socket: &S,
51+
publisher: &mut Pub,
52+
) -> Poll<io::Result<usize>>
53+
where
54+
S: ?Sized + Socket,
55+
Pub: event::ConnectionPublisher,
56+
{
57+
// check if we've already filled the queue
58+
ensure!(self.pending.is_empty(), Ok(1).into());
59+
60+
let capacity = u16::MAX as usize;
61+
62+
// the socket isn't actually used since we're relying on another task to fill the `receiver` channel
63+
let _ = socket;
64+
65+
let result = self
66+
.receiver
67+
.poll_swap(cx, actor, &mut self.pending)
68+
.map_err(|_err| io::Error::from(io::ErrorKind::BrokenPipe));
69+
70+
match result {
71+
Poll::Ready(Ok(())) => {
72+
let committed_len = self
73+
.pending
74+
.iter()
75+
.map(|segment| {
76+
debug_assert!(
77+
!segment.is_empty(),
78+
"the channel should not contain empty packets"
79+
);
80+
segment.len() as usize
81+
})
82+
.sum::<usize>();
83+
publisher.on_stream_read_socket_flushed(
84+
event::builder::StreamReadSocketFlushed {
85+
capacity,
86+
committed_len,
87+
},
88+
);
89+
Ok(committed_len).into()
90+
}
91+
Poll::Ready(Err(error)) => {
92+
let errno = error.raw_os_error();
93+
publisher.on_stream_read_socket_errored(
94+
event::builder::StreamReadSocketErrored { capacity, errno },
95+
);
96+
Err(error).into()
97+
}
98+
Poll::Pending => {
99+
publisher.on_stream_read_socket_blocked(
100+
event::builder::StreamReadSocketBlocked { capacity },
101+
);
102+
Poll::Pending
103+
}
104+
}
105+
}
106+
107+
#[inline]
108+
fn process<R>(
109+
&mut self,
110+
features: TransportFeatures,
111+
router: &mut R,
112+
) -> Result<(), recv::Error>
113+
where
114+
R: Dispatch,
115+
{
116+
debug_assert!(
117+
!features.is_stream(),
118+
"only datagram oriented transport is supported"
119+
);
120+
121+
for mut segment in self.pending.drain(..) {
122+
let remote_addr = segment.remote_address().get();
123+
let ecn = segment.ecn();
124+
router.on_datagram_segment(&remote_addr, ecn, segment.payload_mut())?;
125+
}
126+
127+
Ok(())
128+
}
129+
}
130+
};
131+
}
132+
133+
impl_buffer!(Stream);
134+
impl_buffer!(Control);

dc/s2n-quic-dc/src/stream/recv/buffer/local.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use super::Dispatch;
55
use crate::{
66
event, msg,
7-
stream::{recv, server::handshake, socket::Socket, TransportFeatures},
7+
stream::{recv, server::handshake, socket::Socket, Actor, TransportFeatures},
88
};
99
use core::task::{Context, Poll};
1010
use s2n_codec::{DecoderBufferMut, DecoderError};
@@ -37,6 +37,7 @@ impl super::Buffer for Local {
3737
fn poll_fill<S, Pub>(
3838
&mut self,
3939
cx: &mut Context,
40+
_actor: Actor,
4041
socket: &S,
4142
publisher: &mut Pub,
4243
) -> Poll<io::Result<usize>>

dc/s2n-quic-dc/src/stream/recv/dispatch/handle.rs

+11-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::{descriptor::Descriptor, queue::Error};
5-
use crate::sync::ring_deque;
5+
use crate::{stream::Actor, sync::ring_deque};
66
use core::{
77
fmt,
88
task::{Context, Poll},
@@ -41,22 +41,27 @@ macro_rules! impl_recv {
4141
}
4242

4343
#[inline]
44-
pub async fn recv(&self) -> Result<T, ring_deque::Closed> {
45-
core::future::poll_fn(|cx| self.poll_recv(cx)).await
44+
pub async fn recv(&self, actor: Actor) -> Result<T, ring_deque::Closed> {
45+
core::future::poll_fn(|cx| self.poll_recv(cx, actor)).await
4646
}
4747

4848
#[inline]
49-
pub fn poll_recv(&self, cx: &mut Context) -> Poll<Result<T, ring_deque::Closed>> {
50-
unsafe { self.descriptor.$field().poll_pop(cx) }
49+
pub fn poll_recv(
50+
&self,
51+
cx: &mut Context,
52+
actor: Actor,
53+
) -> Poll<Result<T, ring_deque::Closed>> {
54+
unsafe { self.descriptor.$field().poll_pop(cx, actor) }
5155
}
5256

5357
#[inline]
5458
pub fn poll_swap(
5559
&self,
5660
cx: &mut Context,
61+
actor: Actor,
5762
out: &mut VecDeque<T>,
5863
) -> Poll<Result<(), ring_deque::Closed>> {
59-
unsafe { self.descriptor.$field().poll_swap(cx, out) }
64+
unsafe { self.descriptor.$field().poll_swap(cx, actor, out) }
6065
}
6166
}
6267

0 commit comments

Comments
 (0)