Skip to content
This repository was archived by the owner on Nov 9, 2017. It is now read-only.

Commit f782362

Browse files
committed
Wrap the TcpStream in a BufferedStream.
As it stood, the header-reading part of a client used a BufferedReader which was then discarded, although it had probably consumed at least the start of the request body. I had been using TcpStream, BufferedReader<TcpStream> and BufferedWriter<TcpStream> in different places. This was entirely unworkable, so I've replaced them with immediately wrapping the TcpStream in a BufferedStream and using it everywhere. This makes things sane for now. Alas, this also introduces a rather severe regression owing to an as-yet-unidentified Rust bug, for the call to http::client::response::ResponseReader::construct is causing a segfault. That is, execution is failing after the caller has begun calling the function, but before construct has begun to execute. This renders the client inoperable for the moment. --HG-- extra : amend_source : 9263d96068f59080326179877528b84946587e85
1 parent f4ea90c commit f782362

File tree

7 files changed

+92
-99
lines changed

7 files changed

+92
-99
lines changed

src/libhttp/buffer.rs

+67-72
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,76 @@
11
/// Memory buffers for the benefit of `std::rt::io::net` which has slow read/write.
22
3-
use std::rt::io::{Reader, Writer};
3+
use std::rt::io::{Reader, Writer, Stream};
4+
use std::rt::io::net::tcp::TcpStream;
45
use std::cast::transmute_mut;
56
use std::cmp::min;
67
use std::ptr;
78

9+
pub type BufTcpStream = BufferedStream<TcpStream>;
10+
811
// 64KB chunks (moderately arbitrary)
912
static READ_BUF_SIZE: uint = 0x10000;
1013
static WRITE_BUF_SIZE: uint = 0x10000;
1114
// TODO: consider removing constants and giving a buffer size in the constructor
1215

13-
struct BufferedReader<'self, T> {
14-
wrapped: &'self mut T,
15-
buffer: [u8, ..READ_BUF_SIZE],
16+
struct BufferedStream<T> {
17+
wrapped: T,
18+
read_buffer: [u8, ..READ_BUF_SIZE],
1619
// The current position in the buffer
17-
pos: uint,
20+
read_pos: uint,
1821
// The last valid position in the reader
19-
max: uint,
22+
read_max: uint,
23+
write_buffer: [u8, ..WRITE_BUF_SIZE],
24+
write_len: uint,
25+
26+
/// Some things being written may not like flush() being called yet (e.g. explicitly fail!())
27+
/// The BufferedReader may need to be flushed for good control, but let it provide for such
28+
/// cases by not calling the wrapped object's flush method in turn.
29+
call_wrapped_flush: bool,
2030
}
2131

22-
impl<'self, T: Reader> BufferedReader<'self, T> {
23-
pub fn new<'a>(reader: &'a mut T/*, buffer_size: uint*/) -> BufferedReader<'a, T> {
24-
BufferedReader {
25-
wrapped: reader,
26-
buffer: [0u8, ..READ_BUF_SIZE], //[0u8, ..buffer_size],
27-
pos: 0u,
28-
max: 0u,
32+
impl<T: Reader + Writer /*Stream*/> BufferedStream<T> {
33+
pub fn new(stream: T, call_wrapped_flush: bool) -> BufferedStream<T> {
34+
BufferedStream {
35+
wrapped: stream,
36+
read_buffer: [0u8, ..READ_BUF_SIZE],
37+
read_pos: 0u,
38+
read_max: 0u,
39+
write_buffer: [0u8, ..WRITE_BUF_SIZE],
40+
write_len: 0u,
41+
call_wrapped_flush: call_wrapped_flush,
2942
}
3043
}
44+
}
45+
46+
impl<T: Stream> Stream for BufferedStream<T>;
3147

48+
impl<T: Reader> BufferedStream<T> {
3249
/// Poke a single byte back so it will be read next. For this to make sense, you must have just
3350
/// read that byte. If `self.pos` is 0 and `self.max` is not 0 (i.e. if the buffer is just
3451
/// filled
3552
/// Very great caution must be used in calling this as it will fail if `self.pos` is 0.
3653
pub fn poke_byte(&mut self, byte: u8) {
37-
match (self.pos, self.max) {
38-
(0, 0) => self.max = 1,
54+
match (self.read_pos, self.read_max) {
55+
(0, 0) => self.read_max = 1,
3956
(0, _) => fail!("poke called when buffer is full"),
40-
(_, _) => self.pos -= 1,
57+
(_, _) => self.read_pos -= 1,
4158
}
42-
self.buffer[self.pos] = byte;
59+
self.read_buffer[self.read_pos] = byte;
4360
}
4461

4562
#[inline]
4663
fn fill_buffer(&mut self) -> bool {
47-
assert_eq!(self.pos, self.max);
48-
match self.wrapped.read(self.buffer) {
64+
assert_eq!(self.read_pos, self.read_max);
65+
match self.wrapped.read(self.read_buffer) {
4966
None => {
50-
self.pos = 0;
51-
self.max = 0;
67+
self.read_pos = 0;
68+
self.read_max = 0;
5269
false
5370
},
5471
Some(i) => {
55-
self.pos = 0;
56-
self.max = i;
72+
self.read_pos = 0;
73+
self.read_max = i;
5774
true
5875
},
5976
}
@@ -63,69 +80,46 @@ impl<'self, T: Reader> BufferedReader<'self, T> {
6380
/// (which just uses `read()`)
6481
#[inline]
6582
pub fn read_byte(&mut self) -> Option<u8> {
66-
if self.pos == self.max && !self.fill_buffer() {
83+
if self.read_pos == self.read_max && !self.fill_buffer() {
6784
// Run out of buffered content, no more to come
6885
return None;
6986
}
70-
self.pos += 1;
71-
Some(self.buffer[self.pos - 1])
87+
self.read_pos += 1;
88+
Some(self.read_buffer[self.read_pos - 1])
7289
}
7390
}
7491

75-
impl<'self, T: Reader> Reader for ~BufferedReader<'self, T> {
92+
impl<T: Reader> Reader for BufferedStream<T> {
7693
/// Read at most N bytes into `buf`, where N is the minimum of `buf.len()` and the buffer size.
7794
///
7895
/// At present, this makes no attempt to fill its buffer proactively, instead waiting until you
7996
/// ask.
8097
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
81-
if self.pos == self.max && !self.fill_buffer() {
98+
if self.read_pos == self.read_max && !self.fill_buffer() {
8299
// Run out of buffered content, no more to come
83100
return None;
84101
}
85-
let size = min(self.max - self.pos, buf.len());
102+
let size = min(self.read_max - self.read_pos, buf.len());
86103
unsafe {
87104
do buf.as_mut_buf |p_dst, _len_dst| {
88-
do self.buffer.as_imm_buf |p_src, _len_src| {
105+
do self.read_buffer.as_imm_buf |p_src, _len_src| {
89106
// Note that copy_memory works on bytes; good, u8 is byte-sized
90-
ptr::copy_memory(p_dst, ptr::offset(p_src, self.pos as int), size)
107+
ptr::copy_memory(p_dst, ptr::offset(p_src, self.read_pos as int), size)
91108
}
92109
}
93110
}
94-
self.pos += size;
111+
self.read_pos += size;
95112
Some(size)
96113
}
97114

98115
/// Return whether the Reader has reached the end of the stream AND exhausted its buffer.
99116
fn eof(&mut self) -> bool {
100-
self.pos == self.max && self.wrapped.eof()
101-
}
102-
}
103-
104-
struct BufferedWriter<'self, T> {
105-
wrapped: &'self mut T,
106-
buffer: [u8, ..WRITE_BUF_SIZE],
107-
buflen: uint,
108-
109-
/// Some things being written may not like flush() being called yet (e.g. explicitly fail!())
110-
/// The BufferedReader may need to be flushed for good control, but let it provide for such
111-
/// cases by not calling the wrapped object's flush method in turn.
112-
call_wrapped_flush: bool,
113-
}
114-
115-
impl<'self, T: Writer> BufferedWriter<'self, T> {
116-
pub fn new<'a>(writer: &'a mut T, call_wrapped_flush: bool/*, buffer_size: uint*/)
117-
-> BufferedWriter<'a, T> {
118-
BufferedWriter {
119-
wrapped: writer,
120-
buffer: [0u8, ..WRITE_BUF_SIZE], //[0u8, ..buffer_size],
121-
buflen: 0u,
122-
call_wrapped_flush: call_wrapped_flush,
123-
}
117+
self.read_pos == self.read_max && self.wrapped.eof()
124118
}
125119
}
126120

127121
#[unsafe_destructor]
128-
impl<'self, T: Writer> Drop for BufferedWriter<'self, T> {
122+
impl<T: Writer> Drop for BufferedStream<T> {
129123
fn drop(&self) {
130124
// Clearly wouldn't be a good idea to finish without flushing!
131125

@@ -135,39 +129,40 @@ impl<'self, T: Writer> Drop for BufferedWriter<'self, T> {
135129
}
136130
}
137131

138-
impl<'self, T: Writer> Writer for BufferedWriter<'self, T> {
132+
impl<T: Writer> Writer for BufferedStream<T> {
139133
fn write(&mut self, buf: &[u8]) {
140-
if buf.len() + self.buflen > self.buffer.len() {
134+
if buf.len() + self.write_len > self.write_buffer.len() {
141135
// This is the lazy approach which may involve two writes where it's really not
142136
// warranted. Maybe deal with that later.
143-
if self.buflen > 0 {
144-
self.wrapped.write(self.buffer.slice_to(self.buflen));
145-
self.buflen = 0;
137+
if self.write_len > 0 {
138+
self.wrapped.write(self.write_buffer.slice_to(self.write_len));
139+
self.write_len = 0;
146140
}
147141
self.wrapped.write(buf);
148-
self.buflen = 0;
142+
self.write_len = 0;
149143
} else {
150144
// Safely copy buf onto the "end" of self.buffer
151145
unsafe {
152146
do buf.as_imm_buf |p_src, len_src| {
153-
do self.buffer.as_mut_buf |p_dst, _len_dst| {
147+
do self.write_buffer.as_mut_buf |p_dst, _len_dst| {
154148
// Note that copy_memory works on bytes; good, u8 is byte-sized
155-
ptr::copy_memory(ptr::mut_offset(p_dst, self.buflen as int), p_src, len_src)
149+
ptr::copy_memory(ptr::mut_offset(p_dst, self.write_len as int),
150+
p_src, len_src)
156151
}
157152
}
158153
}
159-
self.buflen += buf.len();
160-
if self.buflen == self.buffer.len() {
161-
self.wrapped.write(self.buffer);
162-
self.buflen = 0;
154+
self.write_len += buf.len();
155+
if self.write_len == self.write_buffer.len() {
156+
self.wrapped.write(self.write_buffer);
157+
self.write_len = 0;
163158
}
164159
}
165160
}
166161

167162
fn flush(&mut self) {
168-
if self.buflen > 0 {
169-
self.wrapped.write(self.buffer.slice_to(self.buflen));
170-
self.buflen = 0;
163+
if self.write_len > 0 {
164+
self.wrapped.write(self.write_buffer.slice_to(self.write_len));
165+
self.write_len = 0;
171166
}
172167
if self.call_wrapped_flush {
173168
self.wrapped.flush();

src/libhttp/client/request.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::rt;
55
use std::rt::io::Writer;
66
use std::rt::io::net::ip::SocketAddr;
77
use std::rt::io::net::tcp::TcpStream;
8+
use buffer::{BufTcpStream, BufferedStream};
89
use headers::Headers;
910
use headers::host::Host;
1011

@@ -29,8 +30,7 @@ use client::response::ResponseReader;
2930

3031
pub struct RequestWriter {
3132
// The place to write to (typically a TCP stream, rt::io::net::tcp::TcpStream)
32-
//priv writer: Option<BufferedWriter<'self, TcpStream>>,
33-
priv stream: Option<TcpStream>,
33+
priv stream: Option<BufTcpStream>,
3434
priv headers_written: bool,
3535

3636
/// The originating IP address of the request.
@@ -99,13 +99,11 @@ impl RequestWriter {
9999

100100
self.stream = match self.remote_addr {
101101
Some(addr) => match TcpStream::connect(addr) {
102-
Some(stream) => Some(stream),
102+
Some(stream) => Some(BufferedStream::new(stream, false)),
103103
None => return false,
104104
},
105105
None => fail!("connect() called before remote_addr was set"),
106106
};
107-
// Desired: BufferedWriter::new(stream, false), but lifetime woes make that not possible
108-
// with how it's structured at present.
109107
true
110108
}
111109

src/libhttp/client/response.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use extra::treemap::TreeMap;
22
use std::rt::io::Reader;
33
use std::rt::io::extensions::ReaderUtil;
4-
use std::rt::io::net::tcp::TcpStream;
54
use std::rt::io::{io_error, OtherIoError, IoError};
65
use std::rt;
76
use client::request::RequestWriter;
@@ -10,10 +9,11 @@ use common::read_http_version;
109
use headers::{Headers, normalise_header_name};
1110
use status::Status;
1211

12+
use buffer::BufTcpStream;
1313
use server::request::{RequestBuffer, EndOfFile, EndOfHeaders, MalformedHeader};
1414

1515
struct ResponseReader {
16-
priv stream: TcpStream,
16+
priv stream: BufTcpStream,
1717

1818
/// The request which this is a response to
1919
request: RequestWriter,
@@ -38,7 +38,7 @@ fn bad_response_err() -> IoError {
3838
}
3939

4040
impl ResponseReader {
41-
pub fn construct(mut stream: TcpStream, request: RequestWriter)
41+
pub fn construct(mut stream: BufTcpStream, request: RequestWriter)
4242
-> Result<ResponseReader, RequestWriter> {
4343
// TODO: raise condition at the points where Err is returned
4444
//let mut b = [0u8, ..4096];
@@ -106,7 +106,6 @@ impl ResponseReader {
106106
// between a request and response.
107107
let headers = {
108108
let mut buffer = RequestBuffer::new(&mut stream);
109-
110109
let mut headers = ~TreeMap::new();
111110
loop {
112111
match buffer.read_header_line() {

src/libhttp/common.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::rt::io::extensions::ReaderUtil;
44

55
#[inline]
66
pub fn read_http_version<T: Reader>(reader: &mut T, next_u8: u8) -> Option<(uint, uint)> {
7-
// XXX: by doing this, I've stopped the more efficient BufferedReader.read_byte from being used
7+
// XXX: by doing this, I've stopped the more efficient BufferedStream.read_byte from being used
88
// HTTP/%u.%u
99
match reader.read_byte() {
1010
Some(b) if b == 'h' as u8 || b == 'H' as u8 => (),

src/libhttp/server/mod.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use extra::time::precise_time_ns;
1010

1111
use std::rt::io::net::tcp::TcpListener;
1212

13+
use buffer::BufferedStream;
14+
1315
pub use self::request::{RequestBuffer, Request};
1416
pub use self::response::ResponseWriter;
1517

@@ -80,13 +82,14 @@ impl<T: Send + Clone + Server> ServerUtil for T {
8082
let child_self = self.clone();
8183
do spawn_supervised {
8284
let mut time_start = time_start;
83-
let mut stream = ~stream.take();
85+
let mut stream = BufferedStream::new(stream.take(),
86+
/* TcpStream.flush() fails! */ false);
8487
debug!("accepted connection, got %?", stream);
8588
loop { // A keep-alive loop, condition at end
8689
let time_spawned = precise_time_ns();
87-
let (request, err_status) = Request::get(~RequestBuffer::new(stream));
90+
let (request, err_status) = Request::load(&mut stream);
8891
let time_request_made = precise_time_ns();
89-
let mut response = ~ResponseWriter::new(stream, request);
92+
let mut response = ~ResponseWriter::new(&mut stream, request);
9093
let time_response_made = precise_time_ns();
9194
match err_status {
9295
Ok(()) => {
@@ -104,7 +107,7 @@ impl<T: Send + Clone + Server> ServerUtil for T {
104107
},
105108
}
106109
// This should not be necessary, but is, because of the Drop bug
107-
// apparent in BufferedWriter. When that is fixed up, then it *may* be
110+
// apparent in BufferedStream. When that is fixed up, then it *may* be
108111
// suitable to remove flush() from here. I say "may" as it would mean
109112
// that time_finished might not include writing all the response (a
110113
// non-trivial time interval).

0 commit comments

Comments
 (0)