
Reputation: 134260

How do I replace a program written as a sequenced stream of state transitions with scalaz-stream?

I'm trying to understand how to reorganize a program which I would previously have written as a sequence of state transitions:

I have some business logic:

type In = Long
type Count = Int 
type Out = Count
type S = Map[Int, Count]

val inputToIn: String => Option[In] 
  = s => try Some(s.toLong) catch { case _ : Throwable => None } 

def transition(in: In): S => (S, Out) 
  = s => { val n = s.getOrElse(in, 0); (s + (in -> n+1), n+1) }

val ZeroOut: Out = 0
val InitialState: S = Map.empty 

With these I wish to construct a program to pass in some initial State (an empty Map), read input from stdin, convert it to In, run the state transition and output the current state S and the output Out to stdout.

Previously, I would have done something like this:

val runOnce = StateT[IO, S, Out](s => IO.readLn.map(inputToIn) flatMap { 
  case None     => IO((s, ZeroOut))
  case Some(in) => val (t, o) = transition(in)(s)
                   IO.putStrLn(t.toString) |+| IO.putStrLn(o.toString) >| IO((t, o))   


However, I'm really struggling to see how to connect this approach (a stream of state transitions) with scalaz-stream. I started with this:

type Transition = S => (S, Out)
val NoTransition: Transition = s => (s, 0)


This is of type: Process[Task, Transition]. I don't really know where to go from there.

  1. How do I "pass in" my InitialState and run the program, threading in the output S at each step as the input S to the next one?
  2. How do I get the values of S and Out at each step and print them to stdout (assuming I can convert them to strings)?

In trying to use a single for-comprehension, I get similarly stuck:

for {
  i <- Process.eval(Task.now(InitialState))
  l <- io.stdInLines.map(inputToIn)

Any help is greatly appreciated!

I've got a bit further now.

type In_ = (S, Option[In])
type Out_ = (S, Out) 

val input: Process[Task, In_] 
  = for  {
      i <- Process.emit(InitialState) 
      o <- io.stdInLines.map(inputToIn)
   } yield (i, o)

val prog =
  input.pipe(process1.collect[In_, Out_]) {
    case (s, Some(in)) => transition(in)(s)



It doesn't work: It seems like the state is not being threaded through the stream. Rather, at each stage, the initial state is being passed in.

Paul Chiusano suggested using the approach of process1.scan. So now I do this:

type In_  = In
type Out_ = (S, Out)

val InitialOut_ = (InitialState, ZeroOut)

val program =
    process1.scan[In_, Out_](InitialOut_) {
      case ((s, _), in) => transition(in)(s)

There's a problem here: In this specific example, my Out type is a monoid, so my initial state can be created using its identity but this may not generally be the case. What would I do then? (I guess I could use Option but this seems like it's unnecessary.)

Upvotes: 81

Views: 1460

Answers (1)


Reputation: 11

import io.FilePath

import scalaz.stream._
import Process._
import scalaz.concurrent.Task
import Task._
import scalaz.{Show, Reducer, Monoid}
import scalaz.std.list._
import scalaz.syntax.foldable._
import scalaz.syntax.bind._
import scalaz.stream._
import io._
import scalaz.stream.text._
import Processes._
import process1.lift
import control.Functions._

 * A Fold[T] can be used to pass over a Process[Task, T].
 * It has:
 *  - accumulation, with an initial state, of type S, a fold action and an action to perform with the last state
 *  - side-effects with a Sink[Task, (T, S)] to write to a file for example, using the current element in the Process
 *    and the current accumulated state
 * This covers many of the needs of iterating over a Scalaz stream and is composable because there is a Monoid
 * instance for Folds
trait Fold[T] {
  type S

  def prepare: Task[Unit]
  def sink: Sink[Task, (T, S)]
  def fold: (T, S) => S
  def init: S
  def last(s: S): Task[Unit]

  /** create a Process1 returning the state values */
  def foldState1: Process1[T, S] =

  /** create a Process1 returning the folded elements and the state values */
  def zipWithState1: Process1[T, (T, S)] =


 * Fold functions and typeclasses
object Fold {

   * Create a Fold from a Sink with no accumulation
  def fromSink[T](aSink: Sink[Task, T]) =  new Fold[T] {
    type S = Unit
    lazy val sink: Sink[Task, (T, S)] = aSink.extend[S]

    def prepare = Task.now(())
    def fold = (t: T, u: Unit) => u
    def init = ()
    def last(u: Unit) = Task.now(u)

   * Transform a simple sink where the written value doesn't depend on the
   * current state into a sink where the current state is passed all the time
   * (and actually ignored)
   * Create a Fold a State function
  def fromState[T, S1](state: (T, S1) => S1)(initial: S1) = new Fold[T] {
    type S = S1
    lazy val sink: Sink[Task, (T, S)] = unitSink[T, S]

    def prepare = Task.now(())
    def fold = state
    def init = initial
    def last(s: S) = Task.now(())

   * Create a Fold from a side-effecting function
  def fromFunction[T](f: T => Task[Unit]): Fold[T] =

   * Create a Fold from a Reducer
  def fromReducer[T, S1](reducer: Reducer[T, S1]): Fold[T] = new Fold[T] {
    type S = S1
    lazy val sink: Sink[Task, (T, S)] = unitSink[T, S]

    def prepare = Task.now(())
    def fold = reducer.cons
    def init = reducer.monoid.zero
    def last(s: S) = Task.now(())

   * Create a Fold from a Reducer and a last action
  def fromReducerAndLast[T, S1](reducer: Reducer[T, S1], lastTask: S1 => Task[Unit]): Fold[T] = new Fold[T] {
    type S = S1
    lazy val sink: Sink[Task, (T, S)] = unitSink[T, S]

    def prepare = Task.now(())
    def fold = reducer.cons
    def init = reducer.monoid.zero
    def last(s: S) = lastTask(s)

   * This Sink doesn't do anything
   * It can be used to build a Fold that does accumulation only
  def unitSink[T, S]: Sink[Task, (T, S)] =
    channel((tu: (T, S)) => Task.now(()))

   * Unit Fold with no side-effect or accumulation
  def unit[T] = fromSink(channel((t: T) => Task.now(())))

   * Unit fold function
  def unitFoldFunction[T]: (T, Unit) => Unit = (t: T, u: Unit) => u

  /** create a fold sink to output lines to a file */
  def showToFilePath[T : Show, S](path: FilePath): Sink[Task, (T, S)] =
    io.fileChunkW(path.path).pipeIn(lift(Show[T].shows) |> utf8Encode).extend[S]

  implicit class FoldOps[T](val fold: Fold[T]) {

   * Monoid for Folds, where effects are sequenced
  implicit def foldMonoid[T]: Monoid[Fold[T]] = new Monoid[Fold[T]] {
    def append(f1: Fold[T], f2: =>Fold[T]): Fold[T] = f1 >> f2
    lazy val zero = Fold.unit[T]

   * create a new Fold sequencing the effects of 2 Folds
  implicit class sequenceFolds[T](val fold1: Fold[T]) {
    def >>(fold2: Fold[T]) = new Fold[T] {
      type S = (fold1.S, fold2.S)

      def prepare = fold1.prepare >> fold2.prepare

      def sink = fold1.sink.zipWith(fold2.sink) { (f1: ((T, fold1.S)) => Task[Unit], f2: ((T, fold2.S)) => Task[Unit]) =>
        (ts: (T, S)) => {
          val (t, (s1, s2)) = ts
          (f1((t, s1)) |@| f2((t, s2)))((_,_))

      def fold = (t : T, s12: (fold1.S, fold2.S)) => (fold1.fold(t, s12._1), fold2.fold(t, s12._2))
      def last(s12: (fold1.S, fold2.S)) = (fold1.last(s12._1) |@| fold2.last(s12._2))((_,_))
      def init = (fold1.init, fold2.init)

   * Run a fold an return the last value
  def runFoldLast[T](process: Process[Task, T], fold: Fold[T]): Task[fold.S] =
    fold.prepare >>
    logged(process |> fold.zipWithState1).drainW(fold.sink).map(_._2).runLastOr(fold.init)

   * Run a Fold an let it perform a last action with the accumulated state
  def runFold[T](process: Process[Task, T], fold: Fold[T]): Task[Unit] =
    runFoldLast(process, fold).flatMap(fold.last)

   * Run a list of Folds, sequenced with the Fold Monoid
  def runFolds[T](process: Process[Task, T], folds: List[Fold[T]]): Task[Unit] =
    runFold(process, folds.suml)


Upvotes: 1

Related Questions