Bharath Kumar
Bharath Kumar

Reputation: 105

How to implement a concurrent processing in akka?

I have a method in which there are multiple calls to db. As I have not implemented any concurrent processing, a 2nd db call has to wait until the 1st db call gets completed, 3rd has to wait until the 2nd gets completed and so on.

All db calls are independent of each other. I want to make this in such a way that all DB calls run concurrently.

I am new to Akka framework.

Can someone please help me with small sample or references would help. Application is developed in Scala Lang.

Upvotes: 0

Views: 387

Answers (2)

Mehdi Mousavi
Mehdi Mousavi

Reputation: 22

If you want to querying database, you should use something like slick which is a modern database query and access library for Scala.

quick example of slick:

case class User(id: Option[Int], first: String, last: String)

class Users(tag: Tag) extends Table[User](tag, "users") {
  def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
  def first = column[String]("first")
  def last = column[String]("last")
  def * = (id.?, first, last) <> (User.tupled, User.unapply)
}
val users = TableQuery[Users]

then your need to create configuration for your db:

mydb = {
  dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
  properties = {
    databaseName = "mydb"
    user = "myuser"
    password = "secret"
  }
  numThreads = 10
}

and in your code you load configuration:

val db = Database.forConfig("mydb")

then run your query with db.run method which gives you future as result, for example you can get all rows by calling method result

val allRows: Future[Seq[User]] = db.run(users.result)

this query run without blocking current thread.

If you have task which take long time to execute or calling to another service, you should use futures.

Example of that is simple HTTP call to external service. you can find example in here

If you have task which take long time to execute and for doing so, you have to keep mutable states, in this case the best option is using Akka Actors which encapsulate your state inside an actor which solve problem of concurrency and thread safety as simple as possible.Example of suck tasks are:

import akka.actor.Actor

import scala.concurrent.Future

case class RegisterEndpoint(endpoint: String)

case class NewUpdate(update: String)

class UpdateConsumer extends Actor {
  val endpoints = scala.collection.mutable.Set.empty[String]

  override def receive: Receive = {

    case RegisterEndpoint(endpoint) =>
      endpoints += endpoint

    case NewUpdate(update) =>
      endpoints.foreach { endpoint =>
        deliverUpdate(endpoint, update)
      }
  }

  def deliverUpdate(endpoint: String, update: String): Future[Unit] = {
    Future.successful(Unit)
  }

}

If you want to process huge amount of live data, or websocket connection, processing CSV file which is growing over time, ... or etc, the best option is Akka stream. For example reading data from kafka topic using Alpakka:Alpakka kafka connector

Upvotes: 0

There are three primary ways that you could achieve concurrency for the given example needs.

Futures

For the particular use case that is asked about in the question I would recommend Futures before any akka construct.

Suppose we are given the database calls as functions:

type Data = ???

val dbcall1 : () => Data = ???

val dbcall2 : () => Data = ???

val dbcall3 : () => Data = ???

Concurrency can be easily applied, and then the results can be collected, using Futures:

val f1 = Future { dbcall1() }
val f2 = Future { dbcall2() }
val f3 = Future { dbcall3() }

for {
  v1 <- f1
  v2 <- f2
  v3 <- f3
} {
  println(s"All data collected: ${v1}, ${v2}, ${v3}")
}

Akka Streams

There is a similar stack answer which demonstrates how to use the akka-stream library to do concurrent db querying.

Akka Actors

It is also possible to write an Actor to do the querying:

object MakeQuery

class DBActor(dbCall : () => Data) extends Actor {
  override def receive = {
    case _ : MakeQuery => sender ! dbCall()
  }
}

val dbcall1ActorRef = system.actorOf(Props(classOf[DBActor], dbcall1)) 

However, in this use case Actors are less helpful because you still need to collect all of the data together.

You can either use the same technique as the "Futures" section:

val f1 : Future[Data] = (dbcall1ActorRef ? MakeQuery).mapTo[Data]

for {
  v1 <- f1
  ...

Or, you would have to wire the Actors together by hand through the constructor and handle all of the callback logic for waiting on the other Actor:

class WaitingDBActor(dbCall : () => Data, previousActor : ActorRef) {
  override def receive = {
    case _ : MakeQuery => previousActor forward MakeQuery

    case previousData : Data => sender ! (dbCall(), previousData)
  }
}

Upvotes: 1

Related Questions