Skip to content

Commit 9c03ba2

Browse files
committed
Update the tokio codec to use new features.
1 parent bfc16a4 commit 9c03ba2

File tree

5 files changed

+140
-171
lines changed

5 files changed

+140
-171
lines changed

examples/tokio-serial.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use futures::{ready, SinkExt, StreamExt};
22
use serialport::SerialPort;
3+
use slip_codec::tokio::SlipCodec;
34
use std::io::{Read, Write};
45
use std::task::Context;
56
use tokio::io::unix::AsyncFd;
@@ -80,7 +81,7 @@ impl AsyncWrite for AsyncTTYPort {
8081
}
8182

8283
async fn run_source(port: AsyncTTYPort) {
83-
let mut sink = tokio_util::codec::Framed::new(port, slip_codec::SlipCodec::new());
84+
let mut sink = tokio_util::codec::Framed::new(port, SlipCodec::new());
8485

8586
for message in ["foo", "bar", "baz"].iter() {
8687
let message = message.to_string().into();
@@ -91,15 +92,14 @@ async fn run_source(port: AsyncTTYPort) {
9192
}
9293

9394
async fn run_sink(port: AsyncTTYPort) {
94-
let mut source = tokio_util::codec::Framed::new(port, slip_codec::SlipCodec::new());
95+
let mut source = tokio_util::codec::Framed::new(port, SlipCodec::new());
9596

9697
loop {
97-
match source.next().await {
98-
Some(result) => match result {
98+
if let Some(result) = source.next().await {
99+
match result {
99100
Ok(message) => println!("recv {:?}", message),
100101
Err(_) => break,
101-
},
102-
None => (),
102+
}
103103
}
104104
}
105105
}

src/tokio/codec.rs

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use super::{SlipDecoder, SlipEncoder};
2+
use crate::{SlipError, MAX_PACKET_SIZE};
3+
use bytes::{Bytes, BytesMut};
4+
use tokio_util::codec::{Decoder, Encoder};
5+
6+
pub struct SlipCodec {
7+
decoder: SlipDecoder,
8+
encoder: SlipEncoder,
9+
}
10+
11+
pub struct SlipCodecBuilder {
12+
begin_with_end: bool,
13+
capacity: usize,
14+
}
15+
16+
impl SlipCodec {
17+
pub fn new() -> Self {
18+
Self {
19+
decoder: SlipDecoder::default(),
20+
encoder: SlipEncoder::default(),
21+
}
22+
}
23+
24+
pub fn builder() -> SlipCodecBuilder {
25+
SlipCodecBuilder {
26+
begin_with_end: true,
27+
capacity: MAX_PACKET_SIZE,
28+
}
29+
}
30+
}
31+
32+
impl Decoder for SlipCodec {
33+
type Item = BytesMut;
34+
type Error = SlipError;
35+
36+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BytesMut>, Self::Error> {
37+
self.decoder.decode(src)
38+
}
39+
}
40+
41+
impl Encoder<Bytes> for SlipCodec {
42+
type Error = SlipError;
43+
44+
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
45+
self.encoder.encode(item, dst).map_err(SlipError::ReadError)
46+
}
47+
}
48+
49+
impl Default for SlipCodec {
50+
fn default() -> Self {
51+
Self::new()
52+
}
53+
}
54+
55+
impl SlipCodecBuilder {
56+
pub fn begin_with_end(self, begin_with_end: bool) -> Self {
57+
Self {
58+
begin_with_end,
59+
..self
60+
}
61+
}
62+
63+
pub fn capacity(self, capacity: usize) -> Self {
64+
Self { capacity, ..self }
65+
}
66+
67+
pub fn build(self) -> SlipCodec {
68+
SlipCodec {
69+
decoder: SlipDecoder::with_capacity(self.capacity),
70+
encoder: SlipEncoder::new(self.begin_with_end),
71+
}
72+
}
73+
}

src/tokio/decoder.rs

+29-83
Original file line numberDiff line numberDiff line change
@@ -1,102 +1,47 @@
1-
use crate::{SlipCodecError, END, ESC, ESC_END, ESC_ESC};
2-
1+
use crate::{SlipError, MAX_PACKET_SIZE};
32
use bytes::{Buf, BufMut, BytesMut};
43
use tokio_util::codec::Decoder;
54

6-
enum State {
7-
Normal,
8-
Error,
9-
Escape,
10-
}
11-
125
/// SLIP decoding context
136
pub struct SlipDecoder {
147
buf: BytesMut,
15-
capacity: Option<usize>,
16-
state: State,
8+
capacity: usize,
9+
inner: crate::SlipDecoder,
1710
}
1811

19-
const INITIAL_CAPACITY: usize = 1024;
20-
2112
impl SlipDecoder {
22-
/// Creates a new context without a maximum buffer size.
23-
pub fn new() -> Self {
24-
Self {
25-
buf: BytesMut::with_capacity(INITIAL_CAPACITY),
26-
capacity: None,
27-
state: State::Normal,
28-
}
29-
}
30-
3113
/// Creates a new context with the given maximum buffer size.
3214
pub fn with_capacity(capacity: usize) -> Self {
3315
Self {
3416
buf: BytesMut::with_capacity(capacity),
35-
capacity: Some(capacity),
36-
state: State::Normal,
17+
capacity,
18+
inner: Default::default(),
3719
}
3820
}
39-
40-
fn push(self: &mut Self, value: u8) -> Result<(), SlipCodecError> {
41-
if let Some(capacity) = self.capacity {
42-
if self.buf.len() == capacity {
43-
return Err(SlipCodecError::OversizedPacket);
44-
}
45-
}
46-
47-
self.buf.put_u8(value);
48-
49-
Ok(())
50-
}
5121
}
5222

5323
impl Decoder for SlipDecoder {
5424
type Item = BytesMut;
55-
type Error = SlipCodecError;
56-
57-
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BytesMut>, SlipCodecError> {
58-
while src.has_remaining() {
59-
let value = src.get_u8();
60-
61-
match self.state {
62-
State::Normal => match value {
63-
END => {
64-
if self.buf.has_remaining() {
65-
return Ok(Some(self.buf.split()));
66-
}
67-
}
68-
ESC => {
69-
self.state = State::Escape;
70-
}
71-
_ => {
72-
self.push(value)?;
73-
}
74-
},
75-
State::Error => {
76-
if value == END {
77-
self.buf.clear();
78-
self.state = State::Normal;
79-
}
80-
}
81-
State::Escape => match value {
82-
ESC_END => {
83-
self.push(END)?;
84-
self.state = State::Normal;
85-
}
86-
ESC_ESC => {
87-
self.push(ESC)?;
88-
self.state = State::Normal;
89-
}
90-
_ => {
91-
self.state = State::Error;
92-
93-
return Err(SlipCodecError::FramingError);
94-
}
95-
},
96-
}
25+
type Error = SlipError;
26+
27+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
28+
let src = &mut src.reader();
29+
let dst = {
30+
self.buf.reserve(self.capacity);
31+
&mut (&mut self.buf).limit(self.capacity).writer()
32+
};
33+
34+
match self.inner.decode(src, dst) {
35+
Ok(len) => Ok(Some(self.buf.split_to(len))),
36+
Err(SlipError::EndOfStream) => Ok(None),
37+
Err(e) => Err(e),
9738
}
39+
}
40+
}
9841

99-
Ok(None)
42+
impl Default for SlipDecoder {
43+
fn default() -> Self {
44+
Self::with_capacity(MAX_PACKET_SIZE)
10045
}
10146
}
10247

@@ -108,7 +53,7 @@ mod tests {
10853
fn empty_decode() {
10954
const INPUT: [u8; 2] = [0xc0, 0xc0];
11055

111-
let mut slip = SlipDecoder::new();
56+
let mut slip = SlipDecoder::default();
11257
let mut buf = BytesMut::from(&INPUT[..]);
11358
let res = slip.decode(&mut buf).unwrap();
11459
assert!(res.is_none());
@@ -120,9 +65,10 @@ mod tests {
12065
const INPUT: [u8; 7] = [0xc0, 0x01, 0x02, 0x03, 0x04, 0x05, 0xc0];
12166
const DATA: [u8; 5] = [0x01, 0x02, 0x03, 0x04, 0x05];
12267

123-
let mut slip = SlipDecoder::new();
68+
let mut slip = SlipDecoder::default();
12469
let mut buf = BytesMut::from(&INPUT[..]);
12570
let buf = slip.decode(&mut buf).unwrap().unwrap();
71+
eprintln!("{}:{}: {:?}", file!(), line!(), buf);
12672
assert_eq!(DATA.len(), buf.len());
12773
assert_eq!(&DATA[..], &buf);
12874
}
@@ -133,7 +79,7 @@ mod tests {
13379
const INPUT: [u8; 6] = [0xc0, 0x01, 0xdb, 0xdc, 0x03, 0xc0];
13480
const DATA: [u8; 3] = [0x01, 0xc0, 0x03];
13581

136-
let mut slip = SlipDecoder::new();
82+
let mut slip = SlipDecoder::default();
13783
let mut buf = BytesMut::from(&INPUT[..]);
13884
let buf = slip.decode(&mut buf).unwrap().unwrap();
13985
assert_eq!(DATA.len(), buf.len());
@@ -146,7 +92,7 @@ mod tests {
14692
const INPUT: [u8; 6] = [0xc0, 0x01, 0xdb, 0xdd, 0x03, 0xc0];
14793
const DATA: [u8; 3] = [0x01, 0xdb, 0x03];
14894

149-
let mut slip = SlipDecoder::new();
95+
let mut slip = SlipDecoder::default();
15096
let mut buf = BytesMut::from(&INPUT[..]);
15197
let buf = slip.decode(&mut buf).unwrap().unwrap();
15298
assert_eq!(DATA.len(), buf.len());
@@ -159,7 +105,7 @@ mod tests {
159105
const INPUT_2: [u8; 6] = [0x05, 0x06, 0x07, 0x08, 0x09, 0xc0];
160106
const DATA: [u8; 10] = [0x01, 0x02, 0x03, 0x04, 0x05, 0x05, 0x06, 0x07, 0x08, 0x09];
161107

162-
let mut slip = SlipDecoder::new();
108+
let mut slip = SlipDecoder::default();
163109
let mut buf = BytesMut::from(&INPUT_1[..]);
164110

165111
{

src/tokio/encoder.rs

+30-36
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,56 @@
1-
use crate::{SlipCodecError, END, ESC, ESC_END, ESC_ESC};
2-
3-
use bytes::{Buf, BufMut, Bytes, BytesMut};
1+
use bytes::{BufMut, Bytes, BytesMut};
42
use tokio_util::codec::Encoder;
53

64
/// SLIP encoder context
7-
pub struct SlipEncoder {}
5+
pub struct SlipEncoder {
6+
inner: crate::SlipEncoder,
7+
}
88

99
impl SlipEncoder {
1010
/// Creates a new encoder context
11-
pub fn new() -> Self {
12-
Self {}
11+
pub fn new(begin_with_end: bool) -> Self {
12+
Self {
13+
inner: crate::SlipEncoder::new(begin_with_end),
14+
}
1315
}
1416
}
1517

1618
impl Encoder<Bytes> for SlipEncoder {
17-
type Error = SlipCodecError;
18-
19-
fn encode(&mut self, mut item: Bytes, dst: &mut BytesMut) -> Result<(), SlipCodecError> {
20-
dst.reserve(item.len());
21-
22-
dst.put_u8(END);
23-
24-
while item.has_remaining() {
25-
let value = item.get_u8();
26-
27-
match value {
28-
END => {
29-
dst.put_u8(ESC);
30-
dst.put_u8(ESC_END);
31-
}
32-
ESC => {
33-
dst.put_u8(ESC);
34-
dst.put_u8(ESC_ESC);
35-
}
36-
_ => {
37-
dst.put_u8(value);
38-
}
39-
}
40-
}
19+
type Error = std::io::Error;
4120

42-
dst.put_u8(END);
21+
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
22+
self.inner
23+
.encode(item.as_ref(), &mut dst.writer())
24+
.map(|_| ())
25+
}
26+
}
4327

44-
Ok(())
28+
impl Default for SlipEncoder {
29+
fn default() -> Self {
30+
Self::new(true)
4531
}
4632
}
4733

4834
#[cfg(test)]
4935
mod tests {
5036
use super::*;
37+
use crate::{END, ESC, ESC_END, ESC_ESC};
5138

5239
#[test]
5340
fn empty_encode() {
5441
const EXPECTED: [u8; 2] = [0xc0, 0xc0];
55-
let mut output = BytesMut::new();
5642

57-
let mut slip = SlipEncoder::new();
43+
// default is to begin and end with END tokens
44+
let mut output = BytesMut::new();
45+
let mut slip = SlipEncoder::default();
5846
slip.encode(Bytes::new(), &mut output).unwrap();
5947
assert_eq!(&EXPECTED[..], &output);
48+
49+
// override to only use END token to terminate packet
50+
let mut output = BytesMut::new();
51+
let mut slip = SlipEncoder::new(false);
52+
slip.encode(Bytes::new(), &mut output).unwrap();
53+
assert_eq!(&EXPECTED[..1], &output);
6054
}
6155

6256
#[test]
@@ -65,7 +59,7 @@ mod tests {
6559
const EXPECTED: [u8; 6] = [0xc0, 0x01, ESC, ESC_ESC, 0x03, 0xc0];
6660
let mut output = BytesMut::new();
6761

68-
let mut slip = SlipEncoder::new();
62+
let mut slip = SlipEncoder::default();
6963
slip.encode(Bytes::from(&INPUT[..]), &mut output).unwrap();
7064
assert_eq!(&EXPECTED[..], &output);
7165
}
@@ -76,7 +70,7 @@ mod tests {
7670
const EXPECTED: [u8; 6] = [0xc0, 0x01, ESC, ESC_END, 0x03, 0xc0];
7771
let mut output = BytesMut::new();
7872

79-
let mut slip = SlipEncoder::new();
73+
let mut slip = SlipEncoder::default();
8074
slip.encode(Bytes::from(&INPUT[..]), &mut output).unwrap();
8175
assert_eq!(&EXPECTED[..], &output);
8276
}

0 commit comments

Comments
 (0)