Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ocsigen_http_com: avoid O(response) allocation by using Lwt_io, 2X speedup #84

Merged
merged 1 commit into from
Aug 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opam
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ depends: [
"base-threads"
"react"
"ssl"
"lwt" {>= "2.4.7"}
"lwt" {>= "2.5.0"}
"ocamlnet" {>= "4.0.2"}
"pcre"
"cryptokit"
Expand Down
64 changes: 33 additions & 31 deletions src/http/ocsigen_http_com.ml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ let create_waiter block =
type connection =
{ id : int;
fd : Lwt_ssl.socket;
chan : Lwt_chan.out_channel;
chan : Lwt_io.output_channel;
timeout : Lwt_timeout.t;
r_mode : mode;
closed : unit Lwt.t * unit Lwt.u;
Expand Down Expand Up @@ -127,11 +127,13 @@ let create_receiver timeout mode fd =
{ id = new_id ();
fd = fd;
chan =
Lwt_chan.make_out_channel
Lwt_io.make
~mode:Lwt_io.output
~buffer:(Lwt_bytes.create buffer_size)
(fun buf pos len ->
Lwt_timeout.start timeout;
Lwt.try_bind
(fun () -> Lwt_ssl.write fd buf pos len)
(fun () -> Lwt_ssl.write_bytes fd buf pos len)
(fun l -> Lwt_timeout.stop timeout; Lwt.return l)
(fun e -> Lwt_timeout.stop timeout;
Lwt.fail (convert_io_error e)));
Expand Down Expand Up @@ -505,7 +507,7 @@ let get_http_frame ?(head = false) receiver =

type slot =
{ sl_waiter : waiter;
sl_chan : Lwt_chan.out_channel;
sl_chan : Lwt_io.output_channel;
sl_ssl : bool (* for secure cookies only *)}

let create_slot conn =
Expand Down Expand Up @@ -534,7 +536,7 @@ let start_processing conn f =
(*XXX It would be clearer to put this code at the end of the sender function,
but we don't have access to [next_slot] there *)
if not next_waiter.w_did_wait then
Lwt_chan.flush conn.chan
Lwt_io.flush conn.chan
else
Lwt.return ()))
(fun () ->
Expand Down Expand Up @@ -565,7 +567,7 @@ let wait_all_senders conn =
(*XXX Do we need a flush here? Are we properly flushing in case of an error? *)
(fun () ->
conn.senders.w_wait >>= fun () ->
Lwt_chan.flush conn.chan)
Lwt_io.flush conn.chan)
(fun e -> match e with Aborted -> Lwt.return () | _ -> Lwt.fail e))
(fun () ->
Lwt_timeout.stop conn.timeout;
Expand Down Expand Up @@ -623,16 +625,16 @@ let default_sender = create_sender ~server_name:Ocsigen_config.server_name ()
Ocsigen_stream.next stream >>= fun e ->
match e with
Ocsigen_stream.Finished _ ->
Lwt_chan.output_string out_ch "0\r\n\r\n"
Lwt_io.write out_ch "0\r\n\r\n"
| Ocsigen_stream.Cont (s, next) ->
let l = String.length s in
begin if l = 0 then
(* It is incorrect to send an empty chunk *)
Lwt.return ()
else begin
Lwt_chan.output_string out_ch (Format.sprintf "%x\r\n" l) >>= fun () ->
Lwt_chan.output_string out_ch s >>= fun () ->
Lwt_chan.output_string out_ch "\r\n"
Lwt_io.write out_ch (Format.sprintf "%x\r\n" l) >>= fun () ->
Lwt_io.write out_ch s >>= fun () ->
Lwt_io.write out_ch "\r\n"
end end >>= fun () ->
write_stream_chunked out_ch next
*)
Expand All @@ -643,7 +645,7 @@ let default_sender = create_sender ~server_name:Ocsigen_config.server_name ()
We bufferise them before creating a thunk.
Benchmarks cannot prove that it is better, but at least the network stream
is readable ...
It is then buffered again by Lwt_chan.
It is then buffered again by Lwt_io.
Is there a way to have only one buffer?
*)
let write_stream_chunked out_ch stream =
Expand All @@ -656,38 +658,38 @@ let write_stream_chunked out_ch stream =
| Ocsigen_stream.Finished _ ->
(if len > 0 then begin
(* It is incorrect to send an empty chunk *)
Lwt_chan.output_string
Lwt_io.write
out_ch (Format.sprintf "%x\r\n" len) >>= fun () ->
Lwt_chan.output out_ch buffer 0 len >>= fun () ->
Lwt_chan.output_string out_ch "\r\n"
Lwt_io.write_from_exactly out_ch buffer 0 len >>= fun () ->
Lwt_io.write out_ch "\r\n"
end else
Lwt.return ()) >>= fun () ->
Lwt_chan.output_string out_ch "0\r\n\r\n"
Lwt_io.write out_ch "0\r\n\r\n"
| Ocsigen_stream.Cont (s, next) ->
let l = String.length s in
if l = 0 then
aux next len
else
if l >= size_for_not_buffering then begin
(if len > 0 then begin
Lwt_chan.output_string
Lwt_io.write
out_ch (Format.sprintf "%x\r\n" len) >>= fun () ->
Lwt_chan.output out_ch buffer 0 len >>= fun () ->
Lwt_chan.output_string out_ch "\r\n"
Lwt_io.write_from_exactly out_ch buffer 0 len >>= fun () ->
Lwt_io.write out_ch "\r\n"
end else Lwt.return ()) >>= fun () ->
Lwt_chan.output_string
Lwt_io.write
out_ch (Format.sprintf "%x\r\n" l) >>= fun () ->
Lwt_chan.output out_ch s 0 l >>= fun () ->
Lwt_chan.output_string out_ch "\r\n" >>= fun () ->
Lwt_io.write_from_exactly out_ch s 0 l >>= fun () ->
Lwt_io.write out_ch "\r\n" >>= fun () ->
aux next 0
end else (* Will not work if l is very large: *)
let available = buf_size - len in
if l > available then begin
Lwt_chan.output_string
Lwt_io.write
out_ch (Format.sprintf "%x\r\n" buf_size) >>= fun () ->
Lwt_chan.output out_ch buffer 0 len >>= fun () ->
Lwt_chan.output out_ch s 0 available >>= fun () ->
Lwt_chan.output_string out_ch "\r\n" >>= fun () ->
Lwt_io.write_from_exactly out_ch buffer 0 len >>= fun () ->
Lwt_io.write_from_exactly out_ch s 0 available >>= fun () ->
Lwt_io.write out_ch "\r\n" >>= fun () ->
let newlen = l - available in
String.blit s available buffer 0 newlen;
aux next newlen
Expand All @@ -705,7 +707,7 @@ let rec write_stream_raw out_ch stream =
| Ocsigen_stream.Finished _ ->
Lwt.return ()
| Ocsigen_stream.Cont (s, next) ->
Lwt_chan.output_string out_ch s >>= fun () ->
Lwt_io.write out_ch s >>= fun () ->
write_stream_raw out_ch next

(*XXX We should check the length of the stream:
Expand Down Expand Up @@ -742,7 +744,7 @@ let send_100_continue slot =
} in
Lwt_log.ign_info ~section "writing 100-continue";
Lwt_log.ign_info ~section hh;
Lwt_chan.output_string out_ch hh
Lwt_io.write out_ch hh

(** Sends the HTTP frame.
* The headers are merged with those of the sender, the priority
Expand Down Expand Up @@ -842,13 +844,13 @@ let send
let hh = Framepp.string_of_header hd in
Lwt_log.ign_info_f ~section "writing header\n%s" hh;
observe_result hd hh >>= fun () ->
Lwt_chan.output_string out_ch hh >>= fun () ->
Lwt_io.write out_ch hh >>= fun () ->
(if reopen <> None then
(* If we want to give a possibility to reopen if
it fails, we must detect the failure before
beginning to read the stream
*)
Lwt_chan.flush out_ch
Lwt_io.flush out_ch
else Lwt.return ())
)
(fun e -> (* *** If we are doing a request,
Expand Down Expand Up @@ -879,8 +881,8 @@ let send
Lwt_log.ign_info ~section "writing body";
write_stream ~chunked out_ch (fst (Result.stream res))
end) >>= fun () ->
Lwt_chan.flush out_ch (* Vincent: I add this otherwise HEAD answers
are not flushed by the reverse proxy *)
Lwt_io.flush out_ch (* Vincent: I add this otherwise HEAD answers
are not flushed by the reverse proxy *)
>>= fun () ->
Ocsigen_stream.finalize (fst (Result.stream res)) `Success
)
Expand Down