Alexey Raga
Alexey Raga

Reputation: 7525

How to compress the output when writing to a file?

I have a computation that along with other things generates some data (a lot of it) and I want to write into a file.

The way the code is structured now is (simplified):

writeRecord :: Handle -> Record -> IO ()
writeRecord h r = hPutStrLn h (toByteString r)

This function is then called periodically during a bigger computation. It is almost like a log, and in fact, multiple files are being written simultaneously.

Now I want the output file to be compressed with Gzip. In languages like Java I would do something like:

outStream = new GzipOutputStream(new FileOutputStream(path)) 

and then would just write into that wrapped output stream.

What is the way of doing it in Haskell? I think writing something like

writeRecord h r = hPut h ((compressed . toByteString) r)

is not correct because compressing each small bit individually isn't efficient (I even tried it and the size of the compressed file is bigger than uncompressed this way).

I also don't think that I can just produce a lazy ByteString (or even a list of chunks) and then write it with compressed . fromChunks because this will require my "generator" building the full thing in memory. And the fact that more than one file is produced at the same time makes it even more complicated.

So what would be a way to solve this in Haskell? Writing to file(s) and have them gzipped?

Upvotes: 6

Views: 1468

Answers (5)

danidiaz
danidiaz

Reputation: 27756

This solution is similar to Michael Snoyman's EDIT 2, but uses the streaming, streaming-bytestring, pipes and pipes-zlib packages.

{-# language OverloadedStrings #-}
module Main where

-- cabal install bytestring streaming streaming-bytestring pipes pipes-zlib 
import Data.ByteString
import qualified Data.ByteString.Streaming as B
import Streaming
import qualified Streaming.Prelude as S
import Pipes (next)
import qualified Pipes.Prelude 
import Pipes.Zlib (compress,defaultCompression,defaultWindowBits)
import System.IO

type Tag = String

producer :: Monad m => Stream (Of (Tag,ByteString)) m ()
producer = do
    S.yield ("foo","This is going to Foo")
    S.yield ("bar","This is going to Bar")

-- I couldn't find a streaming-zlib on Hackage, took a pipes detour
compress' :: MonadIO m 
          => Stream (Of ByteString) m r -> Stream (Of ByteString) m r 
compress' = S.unfoldr Pipes.next
          . compress defaultCompression defaultWindowBits
          . Pipes.Prelude.unfoldr S.next     

keepTag :: Monad m 
        => Tag -> Stream (Of (Tag,ByteString)) m r -> Stream (Of ByteString) m r 
keepTag tag = S.map snd . S.filter ((tag==) . fst)

main :: IO ()
main = runResourceT 
     . B.writeFile "foo.txt" . B.fromChunks . compress' .  keepTag "foo"  
     . B.writeFile "bar.txt"  . B.fromChunks . compress' . keepTag "bar"  
     $ S.copy producer

I make use of the copy function from Streaming.Prelude, that allows you to

Duplicate the content of stream, so that it can be acted on twice in different ways, but without breaking streaming.

Upvotes: 1

Michael
Michael

Reputation: 2909

All the streaming libraries support compression. If I understand the particular problem and the way you are thinking about it, io-streams might be the simplest for your purposes. Here I alternate between writing to trump and clinton output streams, which are written as compressed files. I follow by showing the pipes equivalent of Michael's conduit program

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package io-streams
{-# LANGUAGE OverloadedStrings #-}

import qualified System.IO.Streams as IOS
import qualified System.IO as IO
import Data.ByteString (ByteString)

analyzer :: IOS.OutputStream ByteString -> IOS.OutputStream ByteString -> IO ()
analyzer clinton trump = do 
  IOS.write (Just "This is a string\n") clinton
  IOS.write (Just "This is a string\n") trump
  IOS.write (Just "Clinton string\n") clinton
  IOS.write (Just "Trump string\n") trump   
  IOS.write (Just "Another Clinton string\n") clinton
  IOS.write (Just "Another Trump string\n") trump   
  IOS.write Nothing clinton
  IOS.write Nothing trump

main:: IO ()
main = 
  IOS.withFileAsOutput "some-file-clinton.txt.gz" $ \clinton_compressed ->
  IOS.withFileAsOutput "some-file-trump.txt.gz" $ \trump_compressed -> do
     clinton <- IOS.gzip IOS.defaultCompressionLevel clinton_compressed
     trump <- IOS.gzip IOS.defaultCompressionLevel trump_compressed
     analyzer clinton trump

Obviously you can mix all kinds of IO in analyzer between acts of writing to the two output streams - I'm just show in the writes, so to speak. In particular, if analyzer is understood as depending on an input stream, the writes can depend on reads from the input stream. Here's a (slightly!) more complicated program that does that. If I run the program above I see

$ stack gzip_so.hs  
$ gunzip some-file-clinton.txt.gz 
$ gunzip some-file-trump.txt.gz 
$ cat some-file-clinton.txt 
This is a string
Clinton string
Another Clinton string
$ cat some-file-trump.txt 
This is a string
Trump string
Another Trump string

With pipes and conduit there are various ways of achieving the above effect, with a higher level of decomposition of parts. Writing to separate files will however be a little more subtle. Here in any case is the pipes equivalent of Michael S's conduit program:

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc  --package pipes-zlib 
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, hPutStr)
import System.IO  (IOMode(..), withFile, Handle)
import Pipes  
import qualified Pipes.ByteString as PB
import qualified Pipes.GZip as P

-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"

-- Original version
producerHandle :: Handle -> IO ()
producerHandle h = do
    str <- someAction
    hPutStr h str

producerPipe :: MonadIO m => Producer ByteString m ()
producerPipe = do
    str <- liftIO someAction
    yield str

main :: IO ()
main =  withFile "some-file-pipes.txt.gz"  WriteMode $ \h -> 
     runEffect $ P.compress P.defaultCompression producerPipe  >-> PB.toHandle h 

-- Edit

Here for what it's worth is yet another way of superimposing several producers on a single thread with pipes or conduit, to add to the different approaches Michael S and danidiaz mentioned:

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package pipes-zlib
{-# LANGUAGE OverloadedStrings #-}
import Pipes
import Pipes.GZip
import qualified Pipes.Prelude as P
import qualified Pipes.ByteString as Bytes
import System.IO
import Control.Monad (replicateM_)

producer = replicateM_ 50000 $ do
    marie  "This is going to Marie\n"  -- arbitary IO can be interspersed here
    arthur "This is going to Arthur\n" -- with liftIO
    sylvia "This is going to Sylvia\n" 
  where 
    marie = yield; arthur = lift . yield; sylvia = lift . lift . yield

sinkHelper h p = runEffect (compress bestSpeed p >-> Bytes.toHandle h)

main :: IO ()
main =  
   withFile "marie.txt.gz" WriteMode $ \marie ->
   withFile "arthur.txt.gz"  WriteMode $ \arthur -> 
   withFile "sylvia.txt.gz"  WriteMode $ \sylvia ->
      sinkHelper sylvia
      $ sinkHelper arthur
      $ sinkHelper marie
      $ producer

It is quite simple and fast, and can be written in conduit with the obvious alterations - but finding it natural involves a higher level of buy-in with the 'monad transformer stack' point of view. It would be the most natural way of writing such a program from the point of view of something like the streaming library.

Upvotes: 5

danidiaz
danidiaz

Reputation: 27756

This solution is similar to Michael Snoyman's EDIT 2, but uses the foldl, pipes, pipes-zlib and streaming-eversion packages.

 {-# language OverloadedStrings #-}
module Main where

-- cabal install bytestring foldl pipes pipes-zlib streaming-eversion
import Data.Foldable
import Data.ByteString
import qualified Control.Foldl as L 
import Pipes 
import qualified Pipes.Prelude
import Pipes.Zlib (compress,defaultCompression,defaultWindowBits)
import Streaming.Eversion.Pipes (transvertMIO)
import System.IO

type Tag = String

producer :: Monad m => Producer (Tag,ByteString) m ()
producer = do
    yield $ ("foo","This is going to Foo")
    yield $ ("bar","This is going to Bar")

foldForTag :: Handle -> Tag -> L.FoldM IO (Tag,ByteString) ()
foldForTag handle tag = 
      L.premapM (\(tag',bytes) -> if tag' == tag then Just bytes else Nothing)
    . L.handlesM L.folded
    . transvertMIO (compress defaultCompression defaultWindowBits)
    $ L.mapM_ (Data.ByteString.hPut handle)

main :: IO ()
main = do
    withFile "foo.txt" WriteMode $ \h1 ->
        withFile "bar.txt" WriteMode $ \h2 ->
            let multifold = traverse_ (uncurry foldForTag) [(h1,"foo"),(h2,"bar")] 
            in  L.impurely Pipes.Prelude.foldM multifold producer

Upvotes: 1

Michael Snoyman
Michael Snoyman

Reputation: 31315

Doing this with conduit is fairly straightforward, though you'd need to adjust your code a bit. I've put together an example of before and after code to demonstrate it. The basic idea is:

  • Replace hPutStr h with yield
  • Add some liftIO wrappers
  • Instead of using withBinaryFile or the like, use runConduitRes, gzip, and sinkFile

Here's the example:

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, hPutStr)
import Data.Conduit (ConduitM, (.|), yield, runConduitRes)
import Data.Conduit.Binary (sinkFile)
import Data.Conduit.Zlib (gzip)
import System.IO (Handle)

-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"

-- Original version
producerHandle :: Handle -> IO ()
producerHandle h = do
    str <- someAction
    hPutStr h str

-- Conduit version
producerConduit :: MonadIO m => ConduitM i ByteString m ()
producerConduit = do
    str <- liftIO someAction
    yield str

main :: IO ()
main = runConduitRes $ producerConduit
                    .| gzip
                    .| sinkFile "some-file.txt.gz"

You can learn more about conduit in the conduit tutorial.

Your Java idea is interesting, give me a few more minutes, I'll add an answer that looks more like that.

EDIT

Here's a version that's closer to your Java style approach. It relies on a SinkFunc.hs module which is available as a Gist at: https://gist.github.com/snoyberg/283154123d30ff9e201ea4436a5dd22d

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -Wall -Werror #-}
import Data.ByteString (ByteString)
import Data.Conduit ((.|))
import Data.Conduit.Binary (sinkHandle)
import Data.Conduit.Zlib (gzip)
import System.IO (withBinaryFile, IOMode (WriteMode))
import SinkFunc (withSinkFunc)

-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"

producerFunc :: (ByteString -> IO ()) -> IO ()
producerFunc write = do
    str <- someAction
    write str

main :: IO ()
main = withBinaryFile "some-file.txt.gz" WriteMode $ \h -> do
    let sink = gzip .| sinkHandle h
    withSinkFunc sink $ \write -> producerFunc write

EDIT 2 One more for good measure, actually using ZipSink to stream the data to multiple different files. There are lots of different ways of slicing this, but this is one way that works:

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString (ByteString)
import Data.Conduit (ConduitM, (.|), yield, runConduitRes, ZipSink (..))
import Data.Conduit.Binary (sinkFile)
import qualified Data.Conduit.List as CL
import Data.Conduit.Zlib (gzip)

data Output = Foo ByteString | Bar ByteString

fromFoo :: Output -> Maybe ByteString
fromFoo (Foo bs) = Just bs
fromFoo _ = Nothing

fromBar :: Output -> Maybe ByteString
fromBar (Bar bs) = Just bs
fromBar _ = Nothing

producer :: Monad m => ConduitM i Output m ()
producer = do
    yield $ Foo "This is going to Foo"
    yield $ Bar "This is going to Bar"

sinkHelper :: MonadResource m
           => FilePath
           -> (Output -> Maybe ByteString)
           -> ConduitM Output o m ()
sinkHelper fp f
    = CL.mapMaybe f
   .| gzip
   .| sinkFile fp

main :: IO ()
main = runConduitRes
     $ producer
    .| getZipSink
            (ZipSink (sinkHelper "foo.txt.gz" fromFoo) *>
             ZipSink (sinkHelper "bar.txt.gz" fromBar))

Upvotes: 4

ryachza
ryachza

Reputation: 4540

For incremental compression, I think you could make use of compressIO/foldCompressStream in Codec.Compression.Zlib.Internal.

If you're able to represent your producer action as an IO (Maybe a) (such as an MVar take or InputStream/Chan read) where Nothing signifies end of input, something like this should work:

import System.IO (Handle)
import qualified Data.ByteString as BS
import qualified Codec.Compression.Zlib.Internal as ZLib

compressedWriter :: Handle -> (IO (Maybe BS.ByteString)) -> IO ()
compressedWriter handle source =
  ZLib.foldCompressStream
    (\next -> source >>= maybe (next BS.empty) next)
    (\chunk next -> BS.hPut handle chunk >> next)
    (return ())
    (ZLib.compressIO ZLib.rawFormat ZLib.defaultCompressParams)

Upvotes: 3

Related Questions