Reputation: 1314
What I want achieve is serializable and compossible process descriptors. Basically I am going to create some primitive Processor
(serializable) and then I would like to be able to compose them into higher order Processor
s and then the whole thing should remain automatically serializable. Here is my current implementation, however I suspect that there is more elegant way to do this with some cats typeclass/data structures. I feel dumb that I can't think of a way to take advantage of to use those powerful tools such as Free, Kleisli or State. My challenge was that the type of my state, i.e. the data field in the DataWithContext
, keeps changing.
But there must be a way to overcome that isn't it?
object Test {
import cats.implicits._
import cats.data.XorT
import scala.concurrent.Future
type Cause = String
case class DataWithContext[+A](data: A, context: List[String]) //context never need to change
trait Processor[-A, B] extends Serializable {
def process: DataWithContext[A] ⇒ XorT[Future, Cause, B]
}
implicit class ProcessorOps[A, B](self: Processor[A, B]) {
def >>[C](that: Processor[B, C]) = Con(self, that)
def zip[C](that: Processor[A, C]) = Zip(self, that)
}
//concat two processors
case class Con[A, B, C](a: Processor[A, C], b: Processor[C, B]) extends Processor[A, B] {
def process: DataWithContext[A] ⇒ XorT[Future, Cause, B] = (pc: DataWithContext[A]) ⇒
a.process(pc).flatMap { c ⇒
b.process(pc.copy(data = c))
}
}
//zip two processors
case class Zip[A, B, C](p1: Processor[A, B], p2: Processor[A, C])
extends Processor[A, (B, C)] {
def process: DataWithContext[A] ⇒ XorT[Future, Cause, (B, C)] =
(pc: DataWithContext[A]) ⇒
for {
b ← p1.process(pc)
c ← p2.process(pc)
} yield (b, c)
}
//an example of a primitive Processor
case object Count extends Processor[String, Int] {
def process: DataWithContext[String] ⇒ XorT[Future, Cause, Int] =
(dc: DataWithContext[String]) =>
XorT.pure[Future, Cause, Int](dc.data.length)
}
}
Upvotes: 2
Views: 814
Reputation: 9820
I have :
Processor.process
a Kleisli
.zip
and >>
methods to the Processor
trait itself.Which results in :
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import cats.implicits._
import cats.data.{XorT, Kleisli}
import cats.Apply
type Cause = String
type Result[A] = XorT[Future, Cause, A]
type Process[A, B] = Kleisli[Result, DataWithContext[A], B]
case class DataWithContext[+A](data: A, context: List[String])
implicit class ContextOps[A](a: A) {
def withContext(ctx: List[String]) = DataWithContext(a, ctx)
}
trait Processor[A, B] extends Serializable { self =>
val process: Process[A, B]
def andThen[C](that: Processor[B, C]): Processor[A, C] =
Processor.instance(Kleisli { dc =>
(process.map(_.withContext(dc.context)) andThen that.process).run(dc)
})
// alias for andThen
def >>[C](that: Processor[B, C]) = this andThen that
def zip[C](that: Processor[A, C]): Processor[A, (B, C)] =
Processor.instance(Kleisli { dc =>
Apply[Result].tuple2(self.process.run(dc), that.process.run(dc))
})
}
object Processor {
// create a Processor from a Process
def instance[A, B](p: Process[A, B]) = new Processor[A, B] {
val process = p
}
}
Which can be used as :
object Count extends Processor[String, Int] {
val process: Process[String, Int] =
Kleisli[Result, DataWithContext[String], Int] {
dc => XorT.pure[Future, Cause, Int](dc.data.length)
}
}
val times2: Processor[Int, Int] = Processor.instance(
Kleisli[Result, DataWithContext[Int], Int] (
dc => XorT.pure[Future, Cause, Int](dc.data * 2)))
(Count zip Count).process.run("hello".withContext(List("Context"))) map println
// (5,5)
(Count >> times2).process.run("hello".withContext(List("Context"))) map println
// 10
Upvotes: 3