Alex Fruzenshtein
Alex Fruzenshtein

Reputation: 3116

How to migrate Actor-based logic to Akka Streams?

I've been working with an Akka app for a while. 95% of the code is written with pure actors. Now I'm going to move some parts of the app to Akka Streams. Give me an idea of how the following logic could look in terms of Akka Streams:

+------------+                                             
| CreateUser |                                             
+------------+                                             
      |                                                    
      |                                                    
+------------+     +-------------------+                   
| CheckEmail |-----|EmailIsAlreadyInUse|                   
+------------+     +-------------------+                   
      |                                                    
      |                                                    
+------------+     +-------------------+                   
|3rdPartyCall|-----|NoUserInInternalDB |                   
+------------+     +-------------------+                   
      |                                                    
      |                                                    
+------------+     +-------------------+                   
|  SaveUser  |-----|    UserDBError    |                   
+------------+     +-------------------+                   
      |                                                    
      |                                                    
+------------+                                             
| UserSaved  |                                             
+------------+   

In the current implementation, all of the blocks are messages which I send to an appropriate actor. If the message flow goes successfully, I send back to the sender a UserSaved message. Otherwise I send back one of the validation messages to the sender: EmailIsAlreadyInUse or NoUserInInternalDB or UserDBError.

Here is the set of messages:

case class CreateUser(email: String)
case class CheckEmailUniqueness(email: String)
case class ExternalServiceValidation(email: String)
case class SaveUser(email: String)

sealed trait CreateUserResult
sealed trait CreateUserError
case class UserCreated(email: String) extends CreateUserResult
case class EmailIsAlreadyInUse(email: String) extends CreateUserResult with CreateUserError
case class NoUserInExternalDB(email: String) extends CreateUserResult with CreateUserError
case class UserDBError(email: String) extends CreateUserResult with CreateUserError

How can I migrate this logic to Akka Streams?

Upvotes: 0

Views: 151

Answers (1)

Message Structure

Because akka-stream data is messaged in one direction, from source to sink, there is no "send back to the sender" functionality. Your only choice is to continuously forward the message to the next step.

Therefore, I think you just need to add some extra structure around your message. The Either construct seems useful for this. Lets suppose your CreateUser Actor has a self-contained function:

def createUserFunction(createUser : CreateUser) : UserCreated = ???

This could then be followed by a function to CheckEmail:

val Set[String] existingEmails = ???

def checkEmailUniqueness(userCreated : UserCreated) : Either[CreateUserError, UserCreated] =
  if(existingEmails contains userCreated.email)
    Left(EmailIsAlreadyInUse(userCreated.email))
  else
    Right(createUser)

Similarly, 3rdPartyCall woul also return an Either:

 def thirdPartyLibraryFunction(userCreated : UserCreated) : Boolean = ???

 def thirdPartyCall(userCreated : UserCreated) : Either[CreateUserError, UserCreated] = 
   if(!thirdPartyLibraryFunction(userCreated))
     Left(NoUserInExternalDB(userCreated.email))
   else
     Right(userCreated)

Akka Stream Construction

With this structured messaging you can now create a stream that only moves in a single direction. We first make a Flow that does the user creation:

 val createUserFlow : Flow[CreateUser, UserCreated, _] = 
   Flow[CreateUser] map (createUserFunction)

Then an email check Flow:

 val emailFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] = 
   Flow[UserCreated] map (checkEmailUniqueness)

And now make Flow that does the third party call:

 val thirdPartyFlow : Flow[UserCreated, Either[CreateUserError, UserCreated],_] = 
   Flow[UserCreated] map (_ flatMap thirdPartyCall)

These Flows can now form the foundation of a stream along with a Source and a Sink:

 val userSource : Source[CreateUser, _] = ???

 val userSink : Sink[Either[CreateUserError, UserCreated], _] = 
   Sink[Either[CreateUserError, UserCreated]] foreach {
     case Left(error) =>
       System.err.println("Error with user creation : " error.email)
     case Right(userCreated) =>
       System.out.println("User Created: " userCreated.email)
   }

 //create the full stream
 userSource
   .via(createUserFlow)
   .via(emailFlow)
   .via(thirdPartyFlow)
   .to(userSink)
   .run()

Upvotes: 3

Related Questions