Jacob Wang
Jacob Wang

Reputation: 4794

Conduit - Splitting a ByteString source to chunks of bytes

With sourceFile we get a ByteString stream.

With reference to my other question "Combining multiple Sources/Producers into one", I'm able to get a source of (StdGen, ByteString) using ZipSink, sourceFile and a custom source that produces an infinite stream of StdGen.

What I'm trying to achieve is to pair each StdGen with one single byte of ByteString, but with my current implementation, I'm getting one StdGen paired with the whole content of the input file from sourceFile.

I have looked into Conduit.Binary's isolate function, but it doesn't seem to be working for me when I use as follows:

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}

import System.Random (StdGen(..), split, newStdGen, randomR)
import ClassyPrelude.Conduit as Prelude
import Control.Monad.Trans.Resource (runResourceT, ResourceT(..))
import qualified Data.ByteString as BS
import Data.Conduit.Binary (isolate)

-- generate a infinite source of random number seeds
sourceStdGen :: MonadIO m => Source m StdGen
sourceStdGen = do
    g <- liftIO newStdGen
    loop g
    where loop gin = do
            let g' = fst (split gin)
            yield gin
            loop g'

-- combine the sources into one
sourceInput :: (MonadResource m, MonadIO m) => FilePath -> Source m (StdGen, ByteString)
sourceInput fp = getZipSource $ (,)
    <$> ZipSource sourceStdGen
    <*> ZipSource (sourceFile fp $= isolate 1)

-- a simple conduit, which generates a random number from provide StdGen
-- and append the byte value to the provided ByteString
simpleConduit :: Conduit (StdGen, ByteString) (ResourceT IO) ByteString
simpleConduit = mapC process 

process :: (StdGen, ByteString) -> ByteString
process (g, bs) =
    let rnd = fst $ randomR (40,50) g
    in bs ++ pack [rnd]

main :: IO ()
main = do
    runResourceT $ sourceInput "test.txt" $$ simpleConduit =$ sinkFile "output.txt"

In Conduit terms, I thought isolate will do an await, yield the head of the incoming ByteString stream, and leftOver the rest (put it back to the incoming stream's queue). Basically, what I'm trying to do is chopping the incoming ByteString stream into blocks of bytes.

Am I using it correctly? If isolate is not the function I should be using, then can anyone provide another function that splits it into arbitrary byte chunks?

Upvotes: 3

Views: 1173

Answers (2)

Jacob Wang
Jacob Wang

Reputation: 4794

I managed to write a conduit myself (condWord) which splits incoming ByteString into Word8 chunks. I'm not sure whether I'm reinventing the wheel here though.

To get my intended behavior, I simply bolt condWord onto sourceFile.

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}

import System.Random (StdGen(..), split, newStdGen, randomR)
import ClassyPrelude.Conduit as Prelude
import Control.Monad.Trans.Resource (runResourceT, ResourceT(..))
import qualified Data.ByteString as BS
import Data.Conduit.Binary (isolate)
import Data.Maybe (fromJust)

-- generate a infinite source of random number seeds
sourceStdGen :: MonadIO m => Source m StdGen
sourceStdGen = do
    g <- liftIO newStdGen
    loop g
    where loop gin = do
            let g' = fst (split gin)
            yield gin
            loop g'

-- combine the sources into one
sourceInput :: (MonadResource m, MonadIO m) => FilePath -> Source m (StdGen, Word8)
sourceInput fp = getZipSource $ (,)
    <$> ZipSource sourceStdGen
    <*> ZipSource (sourceFile fp $= condWord)

-- a simple conduit, which generates a random number from provide StdGen
-- and append the byte value to the provided ByteString
simpleConduit :: Conduit (StdGen, Word8) (ResourceT IO) ByteString
simpleConduit = mapC process 

process :: (StdGen, Word8) -> ByteString
process (g, ch) =
    let rnd = fst $ randomR (97,122) g
    in pack [fromIntegral ch, rnd]

condWord :: (Monad m) => Conduit ByteString m Word8
condWord = do
    bs <- await
    case bs of
        Just bs' -> do
            if (null bs')
                then return ()
                else do
                    let (h, t) = fromJust $ BS.uncons bs'
                    yield h
                    leftover t 
                    condWord
        _ -> return ()

main :: IO ()
main = do
    runResourceT $ sourceInput "test.txt" $$ simpleConduit =$ sinkFile "output.txt"

Upvotes: 0

user2407038
user2407038

Reputation: 14578

If I understand correctly, you want something like this:

import System.Random (StdGen, split, newStdGen, randomR)
import qualified Data.ByteString as BS
import Data.Conduit 
import Data.ByteString (ByteString, pack, unpack, singleton)
import Control.Monad.Trans (MonadIO (..))
import Data.List (unfoldr)
import qualified Data.Conduit.List as L
import Data.Monoid ((<>))

input :: MonadIO m => FilePath -> Source m (StdGen, ByteString)
input path = do 
  gs <- unfoldr (Just . split) `fmap` liftIO newStdGen 
  bs <- (map singleton . unpack) `fmap` liftIO (BS.readFile path)
  mapM_ yield (zip gs bs)

output :: Monad m => Sink (StdGen, ByteString) m ByteString
output = L.foldMap (\(g, bs) -> let rnd = fst $ randomR (97,122) g in bs <> pack [rnd])

main :: IO ()
main = (input "in.txt" $$ output) >>=  BS.writeFile "out.txt"

It is probably more efficient to omit map singleton, you may as well use the Word8s directly and convert back to ByteString at the end.

Upvotes: 2

Related Questions