
Reputation: 1314

Is there a smarter way to do this with cats?

What I want achieve is serializable and compossible process descriptors. Basically I am going to create some primitive Processor (serializable) and then I would like to be able to compose them into higher order Processors and then the whole thing should remain automatically serializable. Here is my current implementation, however I suspect that there is more elegant way to do this with some cats typeclass/data structures. I feel dumb that I can't think of a way to take advantage of to use those powerful tools such as Free, Kleisli or State. My challenge was that the type of my state, i.e. the data field in the DataWithContext, keeps changing.

But there must be a way to overcome that isn't it?

object Test {
  import cats.implicits._
  import cats.data.XorT
  import scala.concurrent.Future

  type Cause = String

  case class DataWithContext[+A](data: A, context: List[String]) //context never need to change

  trait Processor[-A, B] extends Serializable {
    def process: DataWithContext[A] ⇒ XorT[Future, Cause, B]

  implicit class ProcessorOps[A, B](self: Processor[A, B]) {
    def >>[C](that: Processor[B, C]) = Con(self, that)
    def zip[C](that: Processor[A, C]) = Zip(self, that)

  //concat two processors
  case class Con[A, B, C](a: Processor[A, C], b: Processor[C, B]) extends Processor[A, B] {
    def process: DataWithContext[A] ⇒ XorT[Future, Cause, B] = (pc: DataWithContext[A]) ⇒
      a.process(pc).flatMap { c ⇒
        b.process(pc.copy(data = c))

  //zip two processors
  case class Zip[A, B, C](p1: Processor[A, B], p2: Processor[A, C])
    extends Processor[A, (B, C)] {
    def process: DataWithContext[A] ⇒ XorT[Future, Cause, (B, C)] = 
      (pc: DataWithContext[A]) ⇒
        for {
          b ← p1.process(pc)
          c ← p2.process(pc)
        } yield (b, c)

  //an example of a primitive Processor
  case object Count extends Processor[String, Int] {
    def process: DataWithContext[String] ⇒ XorT[Future, Cause, Int] =
     (dc: DataWithContext[String]) => 
       XorT.pure[Future, Cause, Int](dc.data.length)


Upvotes: 2

Views: 815

Answers (1)

Peter Neyens
Peter Neyens

Reputation: 9820

I have :

  • made Processor.process a Kleisli.
  • moved the zip and >> methods to the Processor trait itself.
  • introduced some type aliases.

Which results in :

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import cats.implicits._
import cats.data.{XorT, Kleisli}
import cats.Apply

type Cause = String
type Result[A] = XorT[Future, Cause, A]
type Process[A, B] = Kleisli[Result, DataWithContext[A], B]

case class DataWithContext[+A](data: A, context: List[String]) 

implicit class ContextOps[A](a: A) {
  def withContext(ctx: List[String]) = DataWithContext(a, ctx)

trait Processor[A, B] extends Serializable { self => 
  val process: Process[A, B]

  def andThen[C](that: Processor[B, C]): Processor[A, C] = 
    Processor.instance(Kleisli { dc => 
      (process.map(_.withContext(dc.context)) andThen that.process).run(dc)

  // alias for andThen
  def >>[C](that: Processor[B, C]) = this andThen that

  def zip[C](that: Processor[A, C]): Processor[A, (B, C)] = 
    Processor.instance(Kleisli { dc => 
      Apply[Result].tuple2(self.process.run(dc), that.process.run(dc))

object Processor {
  // create a Processor from a Process 
  def instance[A, B](p: Process[A, B]) = new Processor[A, B] {
    val process = p

Which can be used as :

object Count extends Processor[String, Int] {
  val process: Process[String, Int] =
    Kleisli[Result, DataWithContext[String], Int] {
      dc => XorT.pure[Future, Cause, Int](dc.data.length)

val times2: Processor[Int, Int] = Processor.instance(
  Kleisli[Result, DataWithContext[Int], Int] ( 
    dc => XorT.pure[Future, Cause, Int](dc.data * 2)))

(Count zip Count).process.run("hello".withContext(List("Context"))) map println
// (5,5)
(Count >> times2).process.run("hello".withContext(List("Context"))) map println
// 10

Upvotes: 3

Related Questions