Reputation: 727
I intend to model a trivial game-play (HTTPReq/HTTPResp) using Akka Streams. In a round, the player is challenged to guess a number by the server. Server checks the player's response and if the what server holds and what the player guesses are the same, then the player is given a point.
A typical flow is like this:
... so on. Nothing extraordinary.
import akka.{Done, NotUsed}
import akka.stream.scaladsl.{Flow, Keep, Source}
import java.util.Random
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import scala.util.{Failure, Success}
sealed trait GuessingGameMessageToAndFro
case class StartARound(sessionID: String) extends GuessingGameMessageToAndFro
case class RoundStarted(sessionID: String, roundID: Int) extends GuessingGameMessageToAndFro
case class NumberGuessed(sessionID: String, roundID: Int, guessedNo: Int) extends GuessingGameMessageToAndFro
case class CorrectNumberGuessed(sessionID: String, nextRoundID: Int) extends GuessingGameMessageToAndFro
case class FinalRoundScore(sessionID: String, finalScore: Int) extends GuessingGameMessageToAndFro
case class MissingSession(sessionID: String) extends GuessingGameMessageToAndFro
case class IncorrectNumberGuessed(sessionID: String, clientGuessed: Int, serverChose: Int) extends GuessingGameMessageToAndFro
object SessionService {
def exists(m: StartARound) = if (m.sessionID.startsWith("1")) m else MissingSession(m.sessionID)
}
object NumberGenerator {
def numberToOfferToPlayer(m: GuessingGameMessageToAndFro) = {
m match {
case StartARound(s) => RoundStarted(s, new Random().nextInt())
case MissingSession(s) => m
case _ => throw new RuntimeException("Not yet implemented")
}
}
}
val sessionExistenceChecker: Flow[StartARound,GuessingGameMessageToAndFro,NotUsed]
= Flow.fromFunction(m => SessionService.exists(m))
val guessNumberPreparator: Flow[GuessingGameMessageToAndFro,GuessingGameMessageToAndFro,_]
= Flow.fromFunction(m => NumberGenerator.numberToOfferToPlayer(m))
val s1 = StartARound("123")
val k =
Source
.single(s1)
.via(sessionExistenceChecker)
.via(guessNumberPreparator)
.toMat(Sink.head)(Keep.right)
val finallyObtained = k.run
finallyObtained.onComplete(v => {
v match {
case Success(x) => // Prepare proper HTTP Response
case Failure(ex) => // Prepare proper HTTP Response
}
})
The reason I am going through a long pattern matching block in numberToOfferToPlayer() (I have shown 2 here, but obviously its size will increase with every type that can flow) is because if the operator like sessionExistenceChecker generates a MissingSession (which is an error condition), it has to travel through the rest of stream, unchanged till it reaches the Future[Done] stage. In fact, the problem is more general: at any stage, a proper transformation should result into an acceptable type or an error type (mutually exclusive). If I follow this approach, the pattern-matching blocks will proliferate, at the cost of unnecessary repetition, if not ugliness perhaps.
I am feeling uncomfortable with this solution of mine. It is becoming verbose and ungainly.
Needless to say, I have not shown the Akka-HTTP facing part here (including the Routes). The code above can be easily stitched, with the route handlers. So, I have skipped it.
My question is: what is a right idiom for such streams? Conceptually speaking, if everything is fine, the elements should keep moving along the stream. However, whenever an error occurs, the (error) element should shoot off to the final stage, directly, skipping all other stages in between. What is the accepted way to model this?
I have gone through a number of Stackoverflow posts, which demonstrate that for similar situations, one should go the partition/merge way. I understand how I can adopt that approach, but for simple cases like mine, that seems to be unnecessary work. Or, am I completely off the mark here?
Any hint, snippet or rap on the knuckles, will be appreciated.
Upvotes: 1
Views: 120
Reputation: 17923
Use a PartialFunction
For this particular use case I would generally agree that a partition & merge setup is "unnecessary work". The other stack posts, referred to in the question, are for the use case where you only have Flow
values to combine without the ability to manipulate the underlying logic within the Flow.
When you are able to modify the underlying logic then a simpler solution exists. But the solution does not strictly lie within akka's domain. Instead, you can utilize functional programming constructs available in scala itself.
If you rewrite the numberTofferToPlayer
function to be a PartialFunction
:
object NumberGenerator {
val numberToOfferToPlayer : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
case s : StartARound => RoundStarted(s.sessionID, new Random().nextInt())
}
}
Then this PartialFunction
can be lifted into a regular function which will apply the logic if the message is of type StartARound
or just forward the message if it is any other type.
This lifting is done with the applyOrElse
method of PartialFunction in conjunction with the predefined identity
function in scala which returns the input as the output (i.e. "forwards" the input):
import NumberGenerator.numberToOfferToPlayer
val messageForwarder : GuessingGameMessageToAndFro => GuessingGameMessageToAndFro =
identity[GuessingGameMessageToAndFro]
val guessNumberPreparator: Flow[GuessingGameMessageToAndFro,GuessingGameMessageToAndFro,_] =
Flow fromFunction (numberToOfferToPlayer applyOrElse (_, messageForwarder))
Higher Level Of Abstraction
If you have several of these PartialFunctions which you would like to add forwarding logic to:
val foo : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
case r : RoundStarted => ???
}
val bar : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
case n : NumberGuessed => ???
}
Then you can write a general lifter that will abstract away the regular function creation:
val applyOrForward : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] => GuessingGameMessageToAndFro => GuessingGameMessageToAndFro =
((_ : PartialFunction[Int, Int]) applyOrElse ((_ : GuessingGameMessageToAndFro), messageForwader).curried
This lifter will clean up your code nicely:
val offerFlow = Flow fromFunction applyOrForward(numberToOfferToPlayer)
val fooFlow = Flow fromFunction applyOrForward(foo)
val barFlow = Flow fromFunction applyOrForward(bar)
These Flows can then be combined in the manner that the question describes:
val combinedFlow = offerFlow via fooFlow via barFlow
Similarly, you could get the same result by combining the PartialFunctions first and then creating a single Flow from the combination. This would be useful for unit testing outside of akka:
val combinedPartial = numberToOfferToPlayer orElse foo orElse bar
//no akka test kit necessary
assert {
val testError = MissingSession("testId")
applyOrForward(combinedPartial)(testError) equals testError
}
//nothing much to test
val otherCombinedFlow = Flow fromFunction applyOrForward(combinedPartial)
Upvotes: 1