Reputation: 3116
I've been working with an Akka app for a while. 95% of the code is written with pure actors. Now I'm going to move some parts of the app to Akka Streams. Give me an idea of how the following logic could look in terms of Akka Streams:
+------------+
| CreateUser |
+------------+
|
|
+------------+ +-------------------+
| CheckEmail |-----|EmailIsAlreadyInUse|
+------------+ +-------------------+
|
|
+------------+ +-------------------+
|3rdPartyCall|-----|NoUserInInternalDB |
+------------+ +-------------------+
|
|
+------------+ +-------------------+
| SaveUser |-----| UserDBError |
+------------+ +-------------------+
|
|
+------------+
| UserSaved |
+------------+
In the current implementation, all of the blocks are messages which I send to an appropriate actor. If the message flow goes successfully, I send back to the sender a UserSaved
message. Otherwise I send back one of the validation messages to the sender: EmailIsAlreadyInUse
or NoUserInInternalDB
or UserDBError
.
Here is the set of messages:
case class CreateUser(email: String)
case class CheckEmailUniqueness(email: String)
case class ExternalServiceValidation(email: String)
case class SaveUser(email: String)
sealed trait CreateUserResult
sealed trait CreateUserError
case class UserCreated(email: String) extends CreateUserResult
case class EmailIsAlreadyInUse(email: String) extends CreateUserResult with CreateUserError
case class NoUserInExternalDB(email: String) extends CreateUserResult with CreateUserError
case class UserDBError(email: String) extends CreateUserResult with CreateUserError
How can I migrate this logic to Akka Streams?
Upvotes: 0
Views: 151
Reputation: 17923
Message Structure
Because akka-stream data is messaged in one direction, from source to sink, there is no "send back to the sender" functionality. Your only choice is to continuously forward the message to the next step.
Therefore, I think you just need to add some extra structure around your message. The Either
construct seems useful for this. Lets suppose your CreateUser
Actor has a self-contained function:
def createUserFunction(createUser : CreateUser) : UserCreated = ???
This could then be followed by a function to CheckEmail
:
val Set[String] existingEmails = ???
def checkEmailUniqueness(userCreated : UserCreated) : Either[CreateUserError, UserCreated] =
if(existingEmails contains userCreated.email)
Left(EmailIsAlreadyInUse(userCreated.email))
else
Right(createUser)
Similarly, 3rdPartyCall
woul also return an Either:
def thirdPartyLibraryFunction(userCreated : UserCreated) : Boolean = ???
def thirdPartyCall(userCreated : UserCreated) : Either[CreateUserError, UserCreated] =
if(!thirdPartyLibraryFunction(userCreated))
Left(NoUserInExternalDB(userCreated.email))
else
Right(userCreated)
Akka Stream Construction
With this structured messaging you can now create a stream that only moves in a single direction. We first make a Flow
that does the user creation:
val createUserFlow : Flow[CreateUser, UserCreated, _] =
Flow[CreateUser] map (createUserFunction)
Then an email check Flow:
val emailFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] =
Flow[UserCreated] map (checkEmailUniqueness)
And now make Flow that does the third party call:
val thirdPartyFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] =
Flow[UserCreated] map (_ flatMap thirdPartyCall)
These Flows can now form the foundation of a stream along with a Source
and a Sink
:
val userSource : Source[CreateUser, _] = ???
val userSink : Sink[Either[CreateUserError, UserCreated], _] =
Sink[Either[CreateUserError, UserCreated]] foreach {
case Left(error) =>
System.err.println("Error with user creation : " error.email)
case Right(userCreated) =>
System.out.println("User Created: " userCreated.email)
}
//create the full stream
userSource
.via(createUserFlow)
.via(emailFlow)
.via(thirdPartyFlow)
.to(userSink)
.run()
Upvotes: 3