Skip to content

Commit 71f0305

Browse files
committed
Buffered I/O wrappers
The default buffer size is the same as the one in Java's BufferedWriter. We may want BufferedWriter to have a Drop impl that flushes, but that isn't possible right now due to rust-lang#4252/rust-lang#4430. This would be a bit awkward due to the possibility of the inner flush failing. For what it's worth, Java's BufferedReader doesn't have a flushing finalizer, but that may just be because Java's finalizer support is awful. Closes rust-lang#8953
1 parent af259a6 commit 71f0305

File tree

2 files changed

+358
-0
lines changed

2 files changed

+358
-0
lines changed

src/libstd/rt/io/buffered.rs

+355
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
//! Buffering wrappers for I/O traits
12+
//!
13+
//! It can be excessively inefficient to work directly with a `Reader` or
14+
//! `Writer`. Every call to `read` or `write` on `TcpStream` results in a
15+
//! system call, for example. This module provides structures that wrap
16+
//! `Readers`, `Writers`, and `Streams` and buffer input and output to them.
17+
//!
18+
//! # Examples
19+
//!
20+
//! ~~~
21+
//! let tcp_stream = TcpStream::connect(addr);
22+
//! let reader = BufferedReader::new(tcp_stream);
23+
//!
24+
//! let mut buf: ~[u8] = vec::from_elem(100, 0u8);
25+
//! match reader.read(buf.as_slice()) {
26+
//! Some(nread) => println!("Read {} bytes", nread),
27+
//! None => println!("At the end of the stream!")
28+
//! }
29+
//! ~~~
30+
//!
31+
//! ~~~
32+
//! let tcp_stream = TcpStream::connect(addr);
33+
//! let writer = BufferedWriter::new(tcp_stream);
34+
//!
35+
//! writer.write("hello, world".as_bytes());
36+
//! writer.flush();
37+
//! ~~~
38+
//!
39+
//! ~~~
40+
//! let tcp_stream = TcpStream::connect(addr);
41+
//! let stream = BufferedStream::new(tcp_stream);
42+
//!
43+
//! stream.write("hello, world".as_bytes());
44+
//! stream.flush();
45+
//!
46+
//! let mut buf = vec::from_elem(100, 0u8);
47+
//! match stream.read(buf.as_slice()) {
48+
//! Some(nread) => println!("Read {} bytes", nread),
49+
//! None => println!("At the end of the stream!")
50+
//! }
51+
//! ~~~
52+
//!
53+
54+
use prelude::*;
55+
56+
use num;
57+
use vec;
58+
use super::{Reader, Writer, Stream, Decorator};
59+
60+
// libuv recommends 64k buffers to maximize throughput
61+
// https://groups.google.com/forum/#!topic/libuv/oQO1HJAIDdA
62+
static DEFAULT_CAPACITY: uint = 64 * 1024;
63+
64+
/// Wraps a Reader and buffers input from it
65+
pub struct BufferedReader<R> {
66+
priv inner: R,
67+
priv buf: ~[u8],
68+
priv pos: uint,
69+
priv cap: uint
70+
}
71+
72+
impl<R: Reader> BufferedReader<R> {
73+
/// Creates a new `BufferedReader` with with the specified buffer capacity
74+
pub fn with_capacity(cap: uint, inner: R) -> BufferedReader<R> {
75+
BufferedReader {
76+
inner: inner,
77+
buf: vec::from_elem(cap, 0u8),
78+
pos: 0,
79+
cap: 0
80+
}
81+
}
82+
83+
/// Creates a new `BufferedReader` with a default buffer capacity
84+
pub fn new(inner: R) -> BufferedReader<R> {
85+
BufferedReader::with_capacity(DEFAULT_CAPACITY, inner)
86+
}
87+
}
88+
89+
impl<R: Reader> Reader for BufferedReader<R> {
90+
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
91+
if self.pos == self.cap {
92+
match self.inner.read(self.buf) {
93+
Some(cap) => {
94+
self.pos = 0;
95+
self.cap = cap;
96+
}
97+
None => return None
98+
}
99+
}
100+
101+
let src = self.buf.slice(self.pos, self.cap);
102+
let nread = num::min(src.len(), buf.len());
103+
vec::bytes::copy_memory(buf, src, nread);
104+
self.pos += nread;
105+
Some(nread)
106+
}
107+
108+
fn eof(&mut self) -> bool {
109+
self.pos == self.cap && self.inner.eof()
110+
}
111+
}
112+
113+
impl<R: Reader> Decorator<R> for BufferedReader<R> {
114+
fn inner(self) -> R {
115+
self.inner
116+
}
117+
118+
fn inner_ref<'a>(&'a self) -> &'a R {
119+
&self.inner
120+
}
121+
122+
fn inner_mut_ref<'a>(&'a mut self) -> &'a mut R {
123+
&mut self.inner
124+
}
125+
}
126+
127+
/// Wraps a Writer and buffers output to it
128+
///
129+
/// NOTE: `BufferedWriter` will NOT flush its buffer when dropped.
130+
pub struct BufferedWriter<W> {
131+
priv inner: W,
132+
priv buf: ~[u8],
133+
priv pos: uint
134+
}
135+
136+
impl<W: Writer> BufferedWriter<W> {
137+
/// Creates a new `BufferedWriter` with with the specified buffer capacity
138+
pub fn with_capacity(cap: uint, inner: W) -> BufferedWriter<W> {
139+
BufferedWriter {
140+
inner: inner,
141+
buf: vec::from_elem(cap, 0u8),
142+
pos: 0
143+
}
144+
}
145+
146+
/// Creates a new `BufferedWriter` with a default buffer capacity
147+
pub fn new(inner: W) -> BufferedWriter<W> {
148+
BufferedWriter::with_capacity(DEFAULT_CAPACITY, inner)
149+
}
150+
}
151+
152+
impl<W: Writer> Writer for BufferedWriter<W> {
153+
fn write(&mut self, buf: &[u8]) {
154+
if self.pos + buf.len() > self.buf.len() {
155+
self.flush();
156+
}
157+
158+
if buf.len() > self.buf.len() {
159+
self.inner.write(buf);
160+
} else {
161+
let dst = self.buf.mut_slice_from(self.pos);
162+
vec::bytes::copy_memory(dst, buf, buf.len());
163+
self.pos += buf.len();
164+
}
165+
}
166+
167+
fn flush(&mut self) {
168+
if self.pos != 0 {
169+
self.inner.write(self.buf.slice_to(self.pos));
170+
self.pos = 0;
171+
}
172+
self.inner.flush();
173+
}
174+
}
175+
176+
impl<W: Writer> Decorator<W> for BufferedWriter<W> {
177+
fn inner(self) -> W {
178+
self.inner
179+
}
180+
181+
fn inner_ref<'a>(&'a self) -> &'a W {
182+
&self.inner
183+
}
184+
185+
fn inner_mut_ref<'a>(&'a mut self) -> &'a mut W {
186+
&mut self.inner
187+
}
188+
}
189+
190+
struct InternalBufferedWriter<W>(BufferedWriter<W>);
191+
192+
impl<W: Reader> Reader for InternalBufferedWriter<W> {
193+
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
194+
self.inner.read(buf)
195+
}
196+
197+
fn eof(&mut self) -> bool {
198+
self.inner.eof()
199+
}
200+
}
201+
202+
/// Wraps a Stream and buffers input and output to and from it
203+
///
204+
/// NOTE: `BufferedStream` will NOT flush its output buffer when dropped.
205+
pub struct BufferedStream<S>(BufferedReader<InternalBufferedWriter<S>>);
206+
207+
impl<S: Stream> BufferedStream<S> {
208+
pub fn with_capacities(reader_cap: uint, writer_cap: uint, inner: S)
209+
-> BufferedStream<S> {
210+
let writer = BufferedWriter::with_capacity(writer_cap, inner);
211+
let internal_writer = InternalBufferedWriter(writer);
212+
let reader = BufferedReader::with_capacity(reader_cap,
213+
internal_writer);
214+
BufferedStream(reader)
215+
}
216+
217+
pub fn new(inner: S) -> BufferedStream<S> {
218+
BufferedStream::with_capacities(DEFAULT_CAPACITY, DEFAULT_CAPACITY,
219+
inner)
220+
}
221+
}
222+
223+
impl<S: Stream> Reader for BufferedStream<S> {
224+
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
225+
(**self).read(buf)
226+
}
227+
228+
fn eof(&mut self) -> bool {
229+
(**self).eof()
230+
}
231+
}
232+
233+
impl<S: Stream> Writer for BufferedStream<S> {
234+
fn write(&mut self, buf: &[u8]) {
235+
self.inner.write(buf)
236+
}
237+
238+
fn flush(&mut self) {
239+
self.inner.flush()
240+
}
241+
}
242+
243+
impl<S: Stream> Decorator<S> for BufferedStream<S> {
244+
fn inner(self) -> S {
245+
self.inner.inner()
246+
}
247+
248+
fn inner_ref<'a>(&'a self) -> &'a S {
249+
self.inner.inner_ref()
250+
}
251+
252+
fn inner_mut_ref<'a>(&'a mut self) -> &'a mut S {
253+
self.inner.inner_mut_ref()
254+
}
255+
}
256+
257+
#[cfg(test)]
258+
mod test {
259+
use prelude::*;
260+
use super::*;
261+
use super::super::mem::{MemReader, MemWriter};
262+
263+
#[test]
264+
fn test_buffered_reader() {
265+
let inner = MemReader::new(~[0, 1, 2, 3, 4]);
266+
let mut reader = BufferedReader::with_capacity(2, inner);
267+
268+
let mut buf = [0, 0, 0];
269+
let nread = reader.read(buf);
270+
assert_eq!(Some(2), nread);
271+
assert_eq!([0, 1, 0], buf);
272+
assert!(!reader.eof());
273+
274+
let mut buf = [0];
275+
let nread = reader.read(buf);
276+
assert_eq!(Some(1), nread);
277+
assert_eq!([2], buf);
278+
assert!(!reader.eof());
279+
280+
let mut buf = [0, 0, 0];
281+
let nread = reader.read(buf);
282+
assert_eq!(Some(1), nread);
283+
assert_eq!([3, 0, 0], buf);
284+
assert!(!reader.eof());
285+
286+
let nread = reader.read(buf);
287+
assert_eq!(Some(1), nread);
288+
assert_eq!([4, 0, 0], buf);
289+
assert!(reader.eof());
290+
291+
assert_eq!(None, reader.read(buf));
292+
}
293+
294+
#[test]
295+
fn test_buffered_writer() {
296+
let inner = MemWriter::new();
297+
let mut writer = BufferedWriter::with_capacity(2, inner);
298+
299+
writer.write([0, 1]);
300+
assert_eq!([], writer.inner_ref().inner_ref().as_slice());
301+
302+
writer.write([2]);
303+
assert_eq!([0, 1], writer.inner_ref().inner_ref().as_slice());
304+
305+
writer.write([3]);
306+
assert_eq!([0, 1], writer.inner_ref().inner_ref().as_slice());
307+
308+
writer.flush();
309+
assert_eq!([0, 1, 2, 3], writer.inner_ref().inner_ref().as_slice());
310+
311+
writer.write([4]);
312+
writer.write([5]);
313+
assert_eq!([0, 1, 2, 3], writer.inner_ref().inner_ref().as_slice());
314+
315+
writer.write([6]);
316+
assert_eq!([0, 1, 2, 3, 4, 5],
317+
writer.inner_ref().inner_ref().as_slice());
318+
319+
writer.write([7, 8]);
320+
assert_eq!([0, 1, 2, 3, 4, 5, 6],
321+
writer.inner_ref().inner_ref().as_slice());
322+
323+
writer.write([9, 10, 11]);
324+
assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
325+
writer.inner_ref().inner_ref().as_slice());
326+
327+
writer.flush();
328+
assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
329+
writer.inner_ref().inner_ref().as_slice());
330+
}
331+
332+
// This is just here to make sure that we don't infinite loop in the
333+
// newtype struct autoderef weirdness
334+
#[test]
335+
fn test_buffered_stream() {
336+
struct S;
337+
338+
impl Writer for S {
339+
fn write(&mut self, _: &[u8]) {}
340+
fn flush(&mut self) {}
341+
}
342+
343+
impl Reader for S {
344+
fn read(&mut self, _: &mut [u8]) -> Option<uint> { None }
345+
fn eof(&mut self) -> bool { true }
346+
}
347+
348+
let mut stream = BufferedStream::new(S);
349+
let mut buf = [];
350+
stream.read(buf);
351+
stream.eof();
352+
stream.write(buf);
353+
stream.flush();
354+
}
355+
}

src/libstd/rt/io/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,9 @@ mod support;
295295
/// Basic Timer
296296
pub mod timer;
297297

298+
/// Buffered I/O wrappers
299+
pub mod buffered;
300+
298301
/// Thread-blocking implementations
299302
pub mod native {
300303
/// Posix file I/O

0 commit comments

Comments
 (0)