rubik
rubik

Reputation: 9104

Forking the streaming flow in haskell-pipes

I'm having trouble directing flow though a pipeline with haskell-pipes. Basically, I analyze a bunch of files and then I have to either

  1. print results to the terminal in a human-friendly way
  2. encode results to JSON

The chosen path depends upon a command line option.
In the second case, I have to output an opening bracket, then every incoming value followed by a comma and then a closing bracket. Currently insertCommas never terminates, so the closing bracket is never outputted.

import Pipes
import Data.ByteString.Lazy as B
import Data.Aeson (encode)

insertCommas :: Consumer B.ByteString IO ()
insertCommas = do
    first <- await
    lift $ B.putStr first
    for cat $ \obj -> lift $ do
        putStr ","
        B.putStr obj

jsonExporter :: Consumer (FilePath, AnalysisResult) IO ()
jsonExporter = do
    lift $ putStr "["
    P.map encode >-> insertCommas
    lift $ putStr "]"

exportStream :: Config -> Consumer (FilePath, AnalysisResult) IO ()
exportStream conf =
    case outputMode conf of
      JSON -> jsonExporter
      _    -> P.map (export conf) >-> P.stdoutLn

main :: IO ()
main = do
    -- The first two lines are Docopt stuff, not relevant
    args <- parseArgsOrExit patterns =<< getArgs
    ins  <- allFiles $ args `getAllArgs` argument "paths"
    let conf = readConfig args
    runEffect $ each ins
             >-> P.mapM analyze
             >-> P.map (filterResults conf)
             >-> P.filter filterNulls
             >-> exportStream conf

Upvotes: 1

Views: 184

Answers (2)

Michael
Michael

Reputation: 2909

I think you should 'commify' with pipes-group. It has an intercalates, but not an intersperse, but it's not a big deal to write. You should stay away from the Consumer end, I think, for this sort of problem.

{-#LANGUAGE OverloadedStrings #-}
import Pipes
import qualified Pipes.Prelude as P
import qualified Data.ByteString.Lazy.Char8 as B
import Pipes.Group
import Lens.Simple  -- or Control.Lens or Lens.Micro or anything with view/^.
import System.Environment

intersperse_ :: Monad m => a -> Producer a m r -> Producer a m r
intersperse_ a producer = intercalates (yield a) (producer ^. chunksOf 1) 

main = do 
  args <- getArgs
  let op prod = case args of 
        "json":_ -> yield "[" *> intersperse_ "," prod <* yield "]"
        _        -> intersperse_ " " prod
  runEffect $ op producer >-> P.mapM_ B.putStr
  putStrLn ""
  where 
    producer = mapM_ yield (B.words "this is a test")

which give me this

    >>> :main json
    [this,is,a,test]
    >>> :main ---
    this is a test

Upvotes: 1

ErikR
ErikR

Reputation: 52039

AFAIK a Consumer cannot detect the end of a stream. In order to do that you need to use a Pipes.Parser and invert the control.

Here is a Parser which inserts commas between String elements:

import Pipes
import qualified Pipes.Prelude as P
import Pipes.Parse (draw, evalStateT)

commify = do
  lift $ putStrLn "["
  m1 <- draw
  case m1 of
    Nothing -> lift $ putStrLn "]"
    Just x1 -> do
      lift $ putStrLn x1
      let loop = do mx <- draw
                    case mx of
                      Nothing -> lift $ putStrLn "]"
                      Just x  -> lift (putStr "," >> putStrLn x) >> loop
      loop

test1 = evalStateT commify ( mapM_ yield (words "this is a test") )
test2 = evalStateT commify P.stdinLn

To handle the different output formats I would probably make both formats a Parser:

exportParser = do
  mx <- draw
  case mx of
    Nothing -> return ()
    Just x  -> (lift $ putStrLn $ export x) >> exportParser

and then:

let parser = case outputMode of
               JSON -> commify
               _    -> exportParser
evalStateT parser (P.mapM analyze
                      >-> P.map (filterResults conf)
                      >-> P.filter filterNulls)

There is probably a slicker way to write exportParser in terms of foldAllM. You can also use the MaybeT transformer to more succinctly write the commify parser. I've written both out explicitly to make them easier to understand.

Upvotes: 3

Related Questions