Circular dependency in akka actors

Here is the code:

package vu.co.kaiyin.calculus.stewart

import org.json4s.JsonAST.JValue
import org.json4s._
import org.json4s.native.JsonMethods._
import akka.actor._
import akka.routing.RoundRobinPool

import scalaj.http._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import akka.actor._
import com.mongodb.casbah.Imports._

import scalaj.http._
import scala.concurrent.Future

object Fetcher {
  // message definitions
  case class Fetch(val login:String)

  // Props factory definitions
  def props(
             token:Option[String], responseInterpreter:ActorRef
           ):Props = Props(classOf[Fetcher], token, responseInterpreter)

class Fetcher(val token:Option[String], val responseInterpreter:ActorRef)
  extends Actor with ActorLogging {

  import Fetcher._ // import message definition

  def receive = {
    case Fetch(login) => fetchFollowers(login)

  private def fetchFollowers(login:String) {
    val unauthorizedRequest = Http(
    val authorizedRequest = token.map { t =>
      unauthorizedRequest.header("Authorization", s"token $t")

    val request = authorizedRequest.getOrElse(unauthorizedRequest)
    val response = Future { request.asString }

    // Wrap the response in an InterpretResponse message and
    // forward it to the interpreter.
    response.onComplete {
      case Success(r) => responseInterpreter ! ResponseInterpreter.InterpretResponse(login, r)
      case Failure(e) => log.warning(s"Failed to fetch $login: $e")

class ResponseInterpreter(extractor: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case ResponseInterpreter.InterpretResponse(login, httpResponse) => {
      val body = httpResponse.body
      val followers = parse(body).asInstanceOf[JArray]
      extractor ! FollowerExtractor.Extract(login, followers)

object ResponseInterpreter {
  case class InterpretResponse(login: String, followersResponse: HttpResponse[String])

object FollowerExtractor {

  // Messages
  case class Extract(val login:String, val jsonResponse:JArray)

  // Props factory method
  def props = Props[FollowerExtractor]

class FollowerExtractor(manager: ActorRef) extends Actor with ActorLogging {
  import FollowerExtractor._

  val mongoClient = InsertUsers.getClient
  val db = mongoClient("github")
  val coll = db("graph")
  def receive = {
    case Extract(login, followerArray) => {
      val followers = extractFollowers(followerArray)
      followers.foreach { follower  =>
        val _ = Future {
          coll += DBObject("followed" -> login, "follower" -> follower)
        manager ! FetcherManager.AddToQueue(follower)

  def extractFollowers(followerArray:JArray): Seq[String] = for {
    JObject(follower) <- followerArray
    JField("login", JString(login)) <- follower
  } yield login


class FetcherManager(router: ActorRef) extends Actor {
  val added = collection.mutable.Set.empty[String]
  def receive = {
    case FetcherManager.AddToQueue(user) => {
      if(! added.contains(user)) {
        router ! Fetcher.Fetch(user)
        added += user

object FetcherManager {
  case class AddToQueue(user: String)

object GraphDemo extends App {
  val system = ActorSystem("fetch_graph")
  lazy val router: ActorRef = system.actorOf(RoundRobinPool(4).props(
    Fetcher.props(Some("yourtokenhere"), interpreter)
  val manager: ActorRef = system.actorOf(Props(classOf[FetcherManager], router))
  val extractor: ActorRef = system.actorOf(Props(classOf[FollowerExtractor], manager))
  val interpreter: ActorRef = system.actorOf(Props(classOf[ResponseInterpreter], extractor))
  manager ! FetcherManager.AddToQueue("odersky")
  system.scheduler.scheduleOnce(5.minutes) {

It's trying to fetch users and followers from github and store the graph in mongodb. The organization of actors is like this:

enter image description here

I got this error:

Jun 27, 2016 7:59:21 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
Jun 27, 2016 7:59:21 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: No server chosen by PrimaryServerSelector from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, all=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out
Jun 27, 2016 7:59:21 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:1, serverValue:9}] to localhost:27017
Jun 27, 2016 7:59:21 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 7]}, minWireVersion=0, maxWireVersion=4, electionId=null, maxDocumentSize=16777216, roundTripTimeNanos=415354}
Jun 27, 2016 7:59:21 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:2, serverValue:10}] to localhost:27017
    at vu.co.kaiyin.calculus.stewart.Fetcher$$anonfun$vu$co$kaiyin$calculus$stewart$Fetcher$$fetchFollowers$1.apply(GitHubGraph.scala:52)
    at vu.co.kaiyin.calculus.stewart.Fetcher$$anonfun$vu$co$kaiyin$calculus$stewart$Fetcher$$fetchFollowers$1.apply(GitHubGraph.scala:51)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

After debugging I found that responseInterpreter was not initialized (hence null).

How can this be solved?

Justin du Coeur
Well, the error is pretty clearly correct -- the interpreter hasn't been created when you're setting up the router, so it's a null pointer. Forward references are always dangerous.

There are a variety of ways to handle it; the most obvious is probably, after creating the interpreter, you broadcast a message to all of the Fetchers under the Router, giving the pointer to it. But it may be easier to rearrange the creation order -- create the interpreter first, and at the end of setup send it a message with a pointer to the FetcherManager. (Thus avoiding the need for a broadcast message.)

Either of these basically boils down to a two-phase initialization; for a loop like this, that's not terribly unusual...

Modified code:

import org.json4s.JsonAST.JValue
import org.json4s._
import org.json4s.native.JsonMethods._
import akka.actor._
import akka.routing.RoundRobinPool

import scalaj.http._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import akka.actor._
import com.mongodb.casbah.Imports._

import scalaj.http._
import scala.concurrent.Future

object Fetcher {
  // message definitions
  case class Fetch(val login:String)

  // Props factory definitions
  def props(
             token:Option[String], responseInterpreter:ActorRef
           ):Props = Props(classOf[Fetcher], token, responseInterpreter)

class Fetcher(val token:Option[String], val responseInterpreter:ActorRef)
  extends Actor with ActorLogging {

  import Fetcher._ // import message definition

  def receive = {
    case Fetch(login) => fetchFollowers(login)

  private def fetchFollowers(login:String) {
    val unauthorizedRequest = Http(
    val authorizedRequest = token.map { t =>
      unauthorizedRequest.header("Authorization", s"token $t")

    val request = authorizedRequest.getOrElse(unauthorizedRequest)
    val response = Future { request.asString }

    // Wrap the response in an InterpretResponse message and
    // forward it to the interpreter.
    response.onComplete {
      case Success(r) => responseInterpreter ! ResponseInterpreter.InterpretResponse(login, r)
      case Failure(e) => log.warning(s"Failed to fetch $login: $e")

class ResponseInterpreter(extractor: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case ResponseInterpreter.InterpretResponse(login, httpResponse) => {
      val body = httpResponse.body
      val followers = parse(body).asInstanceOf[JArray]
      extractor ! FollowerExtractor.Extract(login, followers)

object ResponseInterpreter {
  case class InterpretResponse(login: String, followersResponse: HttpResponse[String])

object FollowerExtractor {

  // Messages
  case class Extract(val login:String, val jsonResponse:JArray)

  // Props factory method
  def props = Props[FollowerExtractor]

class FollowerExtractor(manager: ActorRef) extends Actor with ActorLogging {
  import FollowerExtractor._

  val mongoClient = InsertUsers.getClient
  val db = mongoClient("github")
  val coll = db("graph")
  def receive = {
    case Extract(login, followerArray) => {
      val followers = extractFollowers(followerArray)
      followers.foreach { follower  =>
        val _ = Future {
          coll += DBObject("followed" -> login, "follower" -> follower)
        manager ! FetcherManager.AddToQueue(follower)

  def extractFollowers(followerArray:JArray): Seq[String] = for {
    JObject(follower) <- followerArray
    JField("login", JString(login)) <- follower
  } yield login


class FetcherManager extends Actor {
  private var router: ActorRef = _
  val added = collection.mutable.Set.empty[String]
  def receive = {
    case ref: ActorRef => router = ref
    case FetcherManager.AddToQueue(user) => {
      if(! added.contains(user)) {
        router ! Fetcher.Fetch(user)
        added += user

object FetcherManager {
  case class AddToQueue(user: String)

object GraphDemo extends App {
  val system = ActorSystem("fetch_graph")
  val manager: ActorRef = system.actorOf(Props(classOf[FetcherManager]))
  val extractor: ActorRef = system.actorOf(Props(classOf[FollowerExtractor], manager))
  val interpreter: ActorRef = system.actorOf(Props(classOf[ResponseInterpreter], extractor))
  val router: ActorRef = system.actorOf(RoundRobinPool(4).props(
    Fetcher.props(Some("yourtoken"), interpreter)
  manager ! router
  manager ! FetcherManager.AddToQueue("odersky")
  system.scheduler.scheduleOnce(15.seconds) {

Upvotes: 1

