From b90f71fa599b90cd8b71cedbaf5ebe852fbb2888 Mon Sep 17 00:00:00 2001 From: Edsko de Vries Date: Fri, 7 Jun 2024 08:02:14 +0200 Subject: [PATCH] Update for latest `http-semantics` --- Network/HTTP2/Client/Run.hs | 23 +++++++++++++++-------- Network/HTTP2/H2/Sender.hs | 2 +- Network/HTTP2/Server/Worker.hs | 2 +- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/Network/HTTP2/Client/Run.hs b/Network/HTTP2/Client/Run.hs index 2ca7c2bf..692b10c0 100644 --- a/Network/HTTP2/Client/Run.hs +++ b/Network/HTTP2/Client/Run.hs @@ -6,7 +6,6 @@ module Network.HTTP2.Client.Run where import Control.Concurrent.STM (check) -import Data.ByteString.Builder (Builder) import qualified Data.ByteString.UTF8 as UTF8 import Data.IORef import Network.Control (RxFlow (..), defaultMaxData) @@ -182,8 +181,8 @@ sendRequest ctx@Context{..} mgr scheme auth (Request req) = do (sid, newstrm) <- openOddStreamWait ctx case outObjBody req of OutBodyStreaming strmbdy -> - sendStreaming ctx mgr req' sid newstrm $ \unmask push flush -> - unmask $ strmbdy push flush + sendStreaming ctx mgr req' sid newstrm $ \iface -> + outBodyUnmask iface $ strmbdy (outBodyPush iface) (outBodyFlush iface) OutBodyStreamingUnmask strmbdy -> sendStreaming ctx mgr req' sid newstrm strmbdy _ -> atomically $ do @@ -199,16 +198,24 @@ sendStreaming -> OutObj -> StreamId -> Stream - -> ((forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()) + -> (OutBodyIface -> IO ()) -> IO () sendStreaming Context{..} mgr req sid newstrm strmbdy = do tbq <- newTBQueueIO 10 -- fixme: hard coding: 10 forkManagedUnmask mgr $ \unmask -> do - let push b = atomically $ writeTBQueue tbq (StreamingBuilder b) - flush = atomically $ writeTBQueue tbq StreamingFlush - finished = atomically $ writeTBQueue tbq $ StreamingFinished (decCounter mgr) + decrementedCounter <- newIORef False + let decCounterOnce = do + alreadyDecremented <- atomicModifyIORef decrementedCounter $ \b -> (True, b) + unless alreadyDecremented $ decCounter mgr + let iface = OutBodyIface { + outBodyUnmask = unmask + , outBodyPush = \b -> atomically $ writeTBQueue tbq (StreamingBuilder b Nothing) + , outBodyPushFinal = \b -> atomically $ writeTBQueue tbq (StreamingBuilder b (Just decCounterOnce)) + , outBodyFlush = atomically $ writeTBQueue tbq StreamingFlush + } + finished = atomically $ writeTBQueue tbq $ StreamingFinished decCounterOnce incCounter mgr - strmbdy unmask push flush `finally` finished + strmbdy iface `finally` finished atomically $ do sidOK <- readTVar outputQStreamID check (sidOK == sid) diff --git a/Network/HTTP2/H2/Sender.hs b/Network/HTTP2/H2/Sender.hs index c9189e68..348600c9 100644 --- a/Network/HTTP2/H2/Sender.hs +++ b/Network/HTTP2/H2/Sender.hs @@ -157,7 +157,7 @@ frameSender let payloadOff = off0 + frameHeaderLength datBuf = confWriteBuffer `plusPtr` payloadOff datBufSiz = buflim - payloadOff - Next datPayloadLen reqflush mnext <- curr datBuf datBufSiz lim -- checkme + Next datPayloadLen reqflush mnext <- curr datBuf (min datBufSiz lim) NextTrailersMaker tlrmkr' <- runTrailersMaker tlrmkr datBuf datPayloadLen fillDataHeaderEnqueueNext strm diff --git a/Network/HTTP2/Server/Worker.hs b/Network/HTTP2/Server/Worker.hs index fe5cda29..949f813a 100644 --- a/Network/HTTP2/Server/Worker.hs +++ b/Network/HTTP2/Server/Worker.hs @@ -150,7 +150,7 @@ response wc@WorkerConf{..} mgr th tconf strm (Request req) (Response rsp) pps = writeOutputQ $ Output strm rsp otyp (Just tbq) (return ()) let push b = do T.pause th - atomically $ writeTBQueue tbq (StreamingBuilder b) + atomically $ writeTBQueue tbq (StreamingBuilder b Nothing) T.resume th flush = atomically $ writeTBQueue tbq StreamingFlush finished = atomically $ writeTBQueue tbq $ StreamingFinished (decCounter mgr)