Dulguun Otgon
Dulguun Otgon

Reputation: 1949

Pipe that maintains state

I'm trying to calculate rolling hash values (buzzhash) for a big file using pipes.

Currently I have this. But don't know how to write a pipe that maintains a state.

import qualified Data.ByteString.Lazy as L
import Data.Word
import Data.Bits(xor,rotate)
import Data.Array
import Pipes
import Control.Monad.State.Strict
import Control.Monad(forever)

produceFromList (x:xs) = do 
  yield x
  produceFromList xs

buzzHash = do
  x <- await
  h <- lift $ get -- pull out previous value
  let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
  lift $ put h' -- save new value 
  yield h'

stdoutLn :: Consumer Word64 IO ()
stdoutLn = do 
  a <- await 
  lift $ print a

main = do 
  bs <- L.unpack `fmap` L.getContents
  runEffect $ produceFromList bs >-> buzzHash >-> stdoutLn

hashArrW8 :: Array Word8 Word64

How do I make buzzHash save previous value and use it for the calculation of next value? Initial state value should be 0.

Upvotes: 1

Views: 280

Answers (1)

Michael
Michael

Reputation: 2909

You were almost there; you just need to run the state.

main = do
  bs <- L.unpack `fmap` L.getContents
  flip execStateT 0 $ runEffect $ produceList bs >-> buzzHash >-> hoist lift stdoutLn

I assume you don't want to recover the state, so I use execStateT rather than runStateT.

The only curiosity here is that stdoutLn was marked as Consumer Word64 IO () . So I use hoist lift to make it Consumer Word64 (StateT Word64 IO) () Everything in the series a >-> b >-> c must agree in the underlying monad and return type.

Here are a few further comments that might save you time. First produceFromList is each.

Moreover, you could have avoided the hoist lift by relabeling your stdoutLn:

stdoutLn :: MonadIO m => Consumer Word64 m ()
stdoutLn = do 
   a <- await 
   liftIO $ print a

But here there is some trouble: you are not repeating the action. This should pretty clearly be a loop:

stdoutLn :: MonadIO m => Consumer Word64 m ()
stdoutLn = do 
   a <- await 
   liftIO $ print a
   stdoutLn

in fact this is already available as P.print, so we can write

import qualified Pipes.Prelude as P
main = do
  bs <- L.unpack `fmap` L.getContents
  flip execStateT 0 $ runEffect $ each bs >-> buzzHash >-> P.print

If I understand you, buzzHash is meant to be repeated indefinitely too:

buzzHash = do
  x <- await
  h <- lift $ get -- pull out previous value
  let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
  lift $ put h' -- save new value 
  yield h'
  buzzHash

(this is forever buzzHash, where we use your buzzHash)

Finally, if you

 import qualified Pipes.ByteString as PB
 import Control.Lens (view) -- (or Lens.Micro.MTL or Lens.Simple)

we see we don't need the lazy bytestring IO, which doesn't stream properly anyway. Pipes.ByteString already has the unpack we want, packaged as a lens, so that we use view PB.unpack where elsewhere we would use B.unpack. So in the end we can write

main = flip evalStateT 0 $ runEffect $ view PB.unpack PB.stdin >-> buzzHash >-> P.print

Once it is in this form we see we aren't using the underlying state of the pipeline except in buzzHash, so we can localize this

import Pipes.Lift (evalStateP) 
main =  runEffect $ view PB.unpack PB.stdin >-> evalStateP 0 buzzHash >-> P.print

or, if you like you can rewrite

buzzHash' :: Monad m => Word64 -> Pipe Word8 Word64 m r
buzzHash' n = evalStateP n $ forever $ do
    x <- await
    h <- lift $ get -- pull out previous value
    let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
    lift $ put h' -- save new value 
    yield h'

Then you would write

main =  runEffect $ view PB.unpack PB.stdin >-> buzzHash' 0 >-> P.print

Upvotes: 3

Related Questions