Reputation: 1949
I'm trying to calculate rolling hash values (buzzhash) for a big file using pipes
.
Currently I have this. But don't know how to write a pipe that maintains a state.
import qualified Data.ByteString.Lazy as L
import Data.Word
import Data.Bits(xor,rotate)
import Data.Array
import Pipes
import Control.Monad.State.Strict
import Control.Monad(forever)
produceFromList (x:xs) = do
yield x
produceFromList xs
buzzHash = do
x <- await
h <- lift $ get -- pull out previous value
let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
lift $ put h' -- save new value
yield h'
stdoutLn :: Consumer Word64 IO ()
stdoutLn = do
a <- await
lift $ print a
main = do
bs <- L.unpack `fmap` L.getContents
runEffect $ produceFromList bs >-> buzzHash >-> stdoutLn
hashArrW8 :: Array Word8 Word64
How do I make buzzHash save previous value and use it for the calculation of next value? Initial state value should be 0.
Upvotes: 1
Views: 280
Reputation: 2909
You were almost there; you just need to run the state.
main = do
bs <- L.unpack `fmap` L.getContents
flip execStateT 0 $ runEffect $ produceList bs >-> buzzHash >-> hoist lift stdoutLn
I assume you don't want to recover the state, so I use execStateT
rather than runStateT
.
The only curiosity here is that stdoutLn
was marked as Consumer Word64 IO ()
. So I use hoist lift
to make it Consumer Word64 (StateT Word64 IO) ()
Everything in the series a >-> b >-> c
must agree in the underlying monad and return type.
Here are a few further comments that might save you time. First produceFromList
is each
.
Moreover, you could have avoided the hoist lift
by relabeling your stdoutLn
:
stdoutLn :: MonadIO m => Consumer Word64 m ()
stdoutLn = do
a <- await
liftIO $ print a
But here there is some trouble: you are not repeating the action. This should pretty clearly be a loop:
stdoutLn :: MonadIO m => Consumer Word64 m ()
stdoutLn = do
a <- await
liftIO $ print a
stdoutLn
in fact this is already available as P.print
, so we can write
import qualified Pipes.Prelude as P
main = do
bs <- L.unpack `fmap` L.getContents
flip execStateT 0 $ runEffect $ each bs >-> buzzHash >-> P.print
If I understand you, buzzHash
is meant to be repeated indefinitely too:
buzzHash = do
x <- await
h <- lift $ get -- pull out previous value
let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
lift $ put h' -- save new value
yield h'
buzzHash
(this is forever buzzHash
, where we use your buzzHash
)
Finally, if you
import qualified Pipes.ByteString as PB
import Control.Lens (view) -- (or Lens.Micro.MTL or Lens.Simple)
we see we don't need the lazy bytestring IO, which doesn't stream properly anyway.
Pipes.ByteString
already has the unpack
we want, packaged as a lens, so that we use view PB.unpack
where elsewhere we would use B.unpack
. So in the end we can write
main = flip evalStateT 0 $ runEffect $ view PB.unpack PB.stdin >-> buzzHash >-> P.print
Once it is in this form we see we aren't using the underlying state of the pipeline except in buzzHash
, so we can localize this
import Pipes.Lift (evalStateP)
main = runEffect $ view PB.unpack PB.stdin >-> evalStateP 0 buzzHash >-> P.print
or, if you like you can rewrite
buzzHash' :: Monad m => Word64 -> Pipe Word8 Word64 m r
buzzHash' n = evalStateP n $ forever $ do
x <- await
h <- lift $ get -- pull out previous value
let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
lift $ put h' -- save new value
yield h'
Then you would write
main = runEffect $ view PB.unpack PB.stdin >-> buzzHash' 0 >-> P.print
Upvotes: 3