Nirmalya
Nirmalya

Reputation: 727

Preparing proper HTTP Response from an Akka-stream that can produce error situations

I intend to model a trivial game-play (HTTPReq/HTTPResp) using Akka Streams. In a round, the player is challenged to guess a number by the server. Server checks the player's response and if the what server holds and what the player guesses are the same, then the player is given a point.

A typical flow is like this:

... so on. Nothing extraordinary.

This is a rough arrangement of types and the flows:

import akka.{Done, NotUsed}
import akka.stream.scaladsl.{Flow, Keep, Source}
import java.util.Random

import akka.stream.scaladsl.Sink

import scala.concurrent.Future
import scala.util.{Failure, Success}

sealed trait GuessingGameMessageToAndFro

case class StartARound(sessionID: String) extends GuessingGameMessageToAndFro

case class RoundStarted(sessionID: String, roundID: Int) extends GuessingGameMessageToAndFro
case class NumberGuessed(sessionID: String, roundID: Int, guessedNo: Int) extends GuessingGameMessageToAndFro
case class CorrectNumberGuessed(sessionID: String, nextRoundID: Int) extends GuessingGameMessageToAndFro
case class FinalRoundScore(sessionID: String, finalScore: Int) extends GuessingGameMessageToAndFro

case class MissingSession(sessionID: String) extends GuessingGameMessageToAndFro
case class IncorrectNumberGuessed(sessionID: String, clientGuessed: Int, serverChose: Int) extends GuessingGameMessageToAndFro

object SessionService {
  def exists(m: StartARound) = if (m.sessionID.startsWith("1")) m else MissingSession(m.sessionID)
}

object NumberGenerator {
  def numberToOfferToPlayer(m: GuessingGameMessageToAndFro) = {
    m match {

      case StartARound(s)     =>  RoundStarted(s, new Random().nextInt())
      case MissingSession(s)  =>  m
      case _                  =>  throw new RuntimeException("Not yet implemented")
    }
  }
}

val sessionExistenceChecker: Flow[StartARound,GuessingGameMessageToAndFro,NotUsed]
      = Flow.fromFunction(m => SessionService.exists(m))

val guessNumberPreparator: Flow[GuessingGameMessageToAndFro,GuessingGameMessageToAndFro,_]
      = Flow.fromFunction(m => NumberGenerator.numberToOfferToPlayer(m))

val s1 = StartARound("123")

val k =
  Source
    .single(s1)
    .via(sessionExistenceChecker)
    .via(guessNumberPreparator)
    .toMat(Sink.head)(Keep.right)

val finallyObtained = k.run

finallyObtained.onComplete(v => {
  v match {
    case Success(x)    => //   Prepare proper HTTP Response
    case Failure(ex)   => //   Prepare proper HTTP Response
  }
})

The reason I am going through a long pattern matching block in numberToOfferToPlayer() (I have shown 2 here, but obviously its size will increase with every type that can flow) is because if the operator like sessionExistenceChecker generates a MissingSession (which is an error condition), it has to travel through the rest of stream, unchanged till it reaches the Future[Done] stage. In fact, the problem is more general: at any stage, a proper transformation should result into an acceptable type or an error type (mutually exclusive). If I follow this approach, the pattern-matching blocks will proliferate, at the cost of unnecessary repetition, if not ugliness perhaps.

I am feeling uncomfortable with this solution of mine. It is becoming verbose and ungainly.

Needless to say, I have not shown the Akka-HTTP facing part here (including the Routes). The code above can be easily stitched, with the route handlers. So, I have skipped it.

My question is: what is a right idiom for such streams? Conceptually speaking, if everything is fine, the elements should keep moving along the stream. However, whenever an error occurs, the (error) element should shoot off to the final stage, directly, skipping all other stages in between. What is the accepted way to model this?

I have gone through a number of Stackoverflow posts, which demonstrate that for similar situations, one should go the partition/merge way. I understand how I can adopt that approach, but for simple cases like mine, that seems to be unnecessary work. Or, am I completely off the mark here?

Any hint, snippet or rap on the knuckles, will be appreciated.

Upvotes: 1

Views: 120

Answers (1)

Use a PartialFunction

For this particular use case I would generally agree that a partition & merge setup is "unnecessary work". The other stack posts, referred to in the question, are for the use case where you only have Flow values to combine without the ability to manipulate the underlying logic within the Flow.

When you are able to modify the underlying logic then a simpler solution exists. But the solution does not strictly lie within akka's domain. Instead, you can utilize functional programming constructs available in scala itself.

If you rewrite the numberTofferToPlayer function to be a PartialFunction:

object NumberGenerator {
  val numberToOfferToPlayer : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
    case s : StartARound  =>  RoundStarted(s.sessionID, new Random().nextInt())
  }
}

Then this PartialFunction can be lifted into a regular function which will apply the logic if the message is of type StartARound or just forward the message if it is any other type.

This lifting is done with the applyOrElse method of PartialFunction in conjunction with the predefined identity function in scala which returns the input as the output (i.e. "forwards" the input):

import NumberGenerator.numberToOfferToPlayer

val messageForwarder : GuessingGameMessageToAndFro => GuessingGameMessageToAndFro = 
  identity[GuessingGameMessageToAndFro]

val guessNumberPreparator: Flow[GuessingGameMessageToAndFro,GuessingGameMessageToAndFro,_] =
  Flow fromFunction (numberToOfferToPlayer applyOrElse (_, messageForwarder))

Higher Level Of Abstraction

If you have several of these PartialFunctions which you would like to add forwarding logic to:

val foo : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
  case r : RoundStarted => ???
}

val bar : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
  case n : NumberGuessed => ???
}

Then you can write a general lifter that will abstract away the regular function creation:

val applyOrForward : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] => GuessingGameMessageToAndFro => GuessingGameMessageToAndFro =
  ((_ : PartialFunction[Int, Int]) applyOrElse ((_ : GuessingGameMessageToAndFro), messageForwader).curried

This lifter will clean up your code nicely:

val offerFlow = Flow fromFunction applyOrForward(numberToOfferToPlayer)

val fooFlow = Flow fromFunction applyOrForward(foo)

val barFlow = Flow fromFunction applyOrForward(bar)

These Flows can then be combined in the manner that the question describes:

val combinedFlow = offerFlow via fooFlow via barFlow

Similarly, you could get the same result by combining the PartialFunctions first and then creating a single Flow from the combination. This would be useful for unit testing outside of akka:

val combinedPartial = numberToOfferToPlayer orElse foo orElse bar

//no akka test kit necessary
assert {
  val testError = MissingSession("testId")

  applyOrForward(combinedPartial)(testError) equals testError
}     

//nothing much to test
val otherCombinedFlow = Flow fromFunction applyOrForward(combinedPartial)

Upvotes: 1

Related Questions