Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrated to async completely #9

Merged
merged 1 commit into from
Aug 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion par.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ executable par
, process
, stm
, string-class
, slave-thread
, text
hs-source-dirs: src
default-language: Haskell2010
100 changes: 44 additions & 56 deletions src/Main.hs
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Main where
module Main
( main
) where

import Control.Concurrent (ThreadId)
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Exception (finally)
import Control.Exception.Enclosed (handleAny)
import Control.Monad (when)
import Control.Monad (unless, void, when)
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Foldable
import qualified Data.List as L
import qualified Data.List.NonEmpty as NL
import Data.Maybe
import Data.Semigroup ((<>))
import Data.String.Class (toStrictByteString)
import Options.Applicative
import Prelude hiding (mapM, mapM_)
import SlaveThread (fork, forkFinally)
import System.Exit
import System.IO
import System.Process
Expand Down Expand Up @@ -65,53 +62,36 @@ work opts = do
let numCmds = length (optCommands opts)
case optMasterProcess opts of
Nothing -> do
(_, w1) <- forkW (runOutqueueFlusher outQ stdout numCmds)
(_, w2) <- forkW (runOutqueueFlusher errQ stderr numCmds)
results <- mapConcurrently (runSingle debug outQ errQ) (optCommands opts)
results <-
waitingPipeHandlers
(runOutqueueFlusher outQ stdout numCmds)
(runOutqueueFlusher errQ stderr numCmds)
(mapConcurrently (runSingle debug outQ errQ) (optCommands opts))
let cmdAndRes = zip (optCommands opts) results
waitSignal w1 >> waitSignal w2
maybe
(exitWith ExitSuccess)
(\rs -> do
let (c, r) = NL.head rs
hPutStrLn stderr ("Failed command:\n" <> c)
exitWith r)
(NL.nonEmpty (filter ((/= ExitSuccess) . snd) cmdAndRes))
case filter ((/= ExitSuccess) . snd) cmdAndRes of
[] -> exitSuccess
(c,r):_ -> do
hPutStrLn stderr $ "Failed command:\n" <> c
exitWith r
Just masterProcNum -> do
outQMain <- newTBQueueIO 1024
errQMain <- newTBQueueIO 1024
_ <- fork (runOutqueueFlusher outQ stdout numCmds)
_ <- fork (runOutqueueFlusher errQ stderr numCmds)
let (xs, (m:ys)) = splitAt masterProcNum (optCommands opts)
(master, rest) = (m, xs ++ ys)
mapM_ (fork . runSingle debug outQ errQ) rest
(_, w1) <- forkW (forwardWaiting outQMain outQ)
(_, w2) <- forkW (forwardWaiting errQMain errQ)
status <- runSingle debug outQMain errQMain master
debug $ "Master process " <> show m <> " exited with status " <> show status
waitSignal w1 >> waitSignal w2
debug $ "Pipes drained, exiting"
exitWith status
withAsync (runOutqueueFlusher outQ stdout numCmds) $ \_ ->
withAsync (runOutqueueFlusher errQ stderr numCmds) $ \_ -> do
let (xs, m:ys) = splitAt masterProcNum (optCommands opts)
(master, rest) = (m, xs ++ ys)
mapM_ (async . runSingle debug outQ errQ) rest
status <- waitingPipeHandlers (forwardWaiting outQMain outQ)
(forwardWaiting errQMain errQ) $
runSingle debug outQMain errQMain master
exitWith status
where
forwardWaiting from to = go
where
go = do
v <- atomically (readTBQueue from)
when (isJust v) (atomically (writeTBQueue to v) >> go)

newtype WaitSignal =
WaitSignal (MVar Bool)

waitSignal :: WaitSignal -> IO ()
waitSignal (WaitSignal mv) = takeMVar mv >> return ()

-- | Fork with a wait-signal ability
forkW :: IO a -> IO (ThreadId, WaitSignal)
forkW f = do
ws <- newEmptyMVar
tid <- forkFinally (putMVar ws True) f
return (tid, WaitSignal ws)

runSingle ::
(String -> IO ())
-> TBQueue (Maybe ByteString)
Expand All @@ -122,13 +102,14 @@ runSingle debug outQ errQ cmdBig = do
debug $ "Starting process " <> show cmd <> ", output prefix " <> show cmdPrefix
(_, Just hout, Just herr, ph) <-
createProcess (shell cmd) {std_out = CreatePipe, std_err = CreatePipe}
(_, w1) <- forkW (forwardHandler hout outQ prefixer)
(_, w2) <- forkW (forwardHandler herr errQ prefixer)
res <- waitForProcess ph
waitSignal w1 >> waitSignal w2
return res
-- TODO: rewrite via Parsec or regex-applicative
s <- waitingPipeHandlers
(forwardPrefixing hout outQ)
(forwardPrefixing herr errQ)
(waitForProcess ph)
debug $ "Process " <> show cmdBig <> " exited with status " <> show s
return s
where
-- TODO: rewrite via Parsec or regex-applicative
(cmd, cmdPrefix) =
if parprefix `L.isPrefixOf` cmdBig
then let (pref, rest) = break (== ' ') (drop (length parprefix) cmdBig)
Expand All @@ -137,6 +118,15 @@ runSingle debug outQ errQ cmdBig = do
parprefix = "PARPREFIX="
toBs = toStrictByteString
prefixer chunk = [toBs cmdPrefix <> chunk]
forwardPrefixing from to = forwardHandler from to prefixer

waitingPipeHandlers :: IO a -> IO b -> IO c -> IO c
waitingPipeHandlers outH errH inner =
withAsync outH $ \out ->
withAsync errH $ \err -> do
res <- inner
void $ waitBoth out err
return res

forwardHandler ::
Handle
Expand All @@ -147,13 +137,11 @@ forwardHandler from to f = fin (hndl go)
where
go = do
eof <- hIsEOF from
if eof
then return ()
else do
line <- B.hGetLine from
atomically
(writeTBQueue to (Just (B.concat (map (<> "\n") (f line)))))
go
unless eof $ do
line <- B.hGetLine from
atomically
(writeTBQueue to (Just (B.concat (map (<> "\n") (f line)))))
go
hndl = handleAny (const (return ()))
fin f' = finally f' (atomically (writeTBQueue to Nothing))

Expand Down