Reputation: 315
I have to write a command line tool gluing together some components for an experiment and need help for a code design that meet my requirements.
At top level I have to work with samples each generated by a – in runtime as well as in memory consumption – expensive call to another program with the function "System.Process.readProcessWithExitCode". Therefore you can imagine to have a (expensive) function "genSample :: IO a" and you need n return values of that function.
My requirements are: 1. Let p be the number of processors, then at most p samples (i.e. calls to genSample) should be computed in parallel. 2. A timeout should be possible set which aborts the generation of the samples. 3. If the computation of all the samples times out, the started processes within a genSample-call should be stopped
My current solution meets requirements 1 and 2. For the third one I currently help myself by executing a killall-command. That seems to be a dirty hack to me. Perhaps someone has a better idea?
Here the central part of my current solution:
import qualified Control.Monad.Par.Class as ParIO
import qualified Control.Monad.Par.IO as ParIO
…
-- | @parRepeatM i n a@ performs action @a@ @n@ times in parallel with timeout @t@
parRepeatM :: ParIO.NFData a =>
Integer -- ^ timeout in seconds
-> Integer -- ^ number of duplicates (here: number of req. samples)
-> IO a -- ^ action to perform (here: genSample)
-> IO (Maybe [a])
parRepeatM t n a = timeout t $ ParIO.runParIO $ do
let tasks = genericReplicate n $ liftIO a -- :: [ParIO a]
ivars <- mapM ParIO.spawn tasks
mapM ParIO.get ivars
A central problem at the moment is that after abortion due to a timeout the command called within genSample continues it's execution – in the worst case until the whole haskell-gluing-program ends.
Upvotes: 2
Views: 345
Reputation: 27756
In Haskell, cancellation is usually handled through asynchronous exceptions. That's what timeout
seems to use.
So, we can try to install an exception handler in the code that executes the external process. The handler will invoke terminateProcess
whenever an exception (asynchronous or not) crops up. Because terminateProcess
requires a reference to the
process handle, we'll have to use createProcess
instead of the higher-level readProcessWithExitCode
.
First, some imports and auxiliary functions (I'm using the async
package):
{-# LANGUAGE ScopedTypeVariables #-}
import Control.Applicative
import Control.Exception
import Control.Concurrent (threadDelay, MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.Async (race_, Concurrently(..), waitEither, withAsync)
import System.Process
import System.Exit
import System.IO
import qualified Data.ByteString as B
-- Executes two actions concurrently and returns the one that finishes first.
-- If an asynchronous exception is thrown, the second action is terminated
-- first.
race' :: IO a -> IO a -> IO a
race' left right =
withAsync left $ \a ->
withAsync right $ \b ->
fmap (either id id) (waitEither a b)
-- terminate external process on exception, ignore if already dead.
terminateCarefully :: ProcessHandle -> IO ()
terminateCarefully pHandle =
catch (terminateProcess pHandle) (\(e::IOException) -> return ())
This function launches an external process and returns its stdout and exit code, terminating the process if the thread is cancelled:
safeExec :: CreateProcess -> IO (B.ByteString, ExitCode)
safeExec cp =
bracketOnError
(createProcess cp {std_out = CreatePipe})
(\(_,_ ,_,pHandle) -> terminateCarefully pHandle)
(\(_,Just hOut,_,pHandle) -> do
-- Workaround for a Windows issue.
latch <- newEmptyMVar
race'
(do -- IO actions are uninterruptible on Windows :(
takeMVar latch
contents <- B.hGetContents hOut
ec <- waitForProcess pHandle
pure (contents,ec))
-- Dummy interruptible action that
-- receives asynchronous exceptions first
-- and helps to end the other action.
(onException
(do
putMVar latch ()
-- runs forever unless interrupted
runConcurrently empty)
(terminateCarefully pHandle)))
About the implementation:
bracketOnError is used to ensure that the external process is killed if an exception happens.
In Windows, I/O operations like reading from a Handle
are uninterruptible (see https://ghc.haskell.org/trac/ghc/ticket/7353). This means they are impervious to asynchronous exceptions. As a workaround to this, I create a "dummy" thread that waits forever (runConcurrently empty
) and can be interrupted by exceptions. When it is interrupted, it terminates the external process, causing the reads in the companion thread to finish, making the companion thread vulnerable to asynchronous exceptions again.
The "latch" is used to prevent any uninterruptible operation on the handle until the inner exception handler is installed.
It's a bit convoluted but it seems to work, at least tested with:
main :: IO ()
main = do
race_ (safeExec $ proc "calc" [])
(threadDelay (3*10^6))
The calc app is killed after three seconds. Here's the whole gist.
Remember also that:
on Windows, if the process was a shell command created by createProcess with shell, or created by runCommand or runInteractiveCommand, then terminateProcess will only terminate the shell, not the command itself.
Upvotes: 2