Reputation: 23114
Here is the code:
package vu.co.kaiyin.calculus.stewart
import org.json4s.JsonAST.JValue
import org.json4s._
import org.json4s.native.JsonMethods._
import akka.actor._
import akka.routing.RoundRobinPool
import scalaj.http._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import akka.actor._
import com.mongodb.casbah.Imports._
import scalaj.http._
import scala.concurrent.Future
object Fetcher {
// message definitions
case class Fetch(val login:String)
// Props factory definitions
def props(
token:Option[String], responseInterpreter:ActorRef
):Props = Props(classOf[Fetcher], token, responseInterpreter)
}
class Fetcher(val token:Option[String], val responseInterpreter:ActorRef)
extends Actor with ActorLogging {
import Fetcher._ // import message definition
def receive = {
case Fetch(login) => fetchFollowers(login)
}
private def fetchFollowers(login:String) {
val unauthorizedRequest = Http(
s"https://api.github.com/users/$login/followers")
val authorizedRequest = token.map { t =>
unauthorizedRequest.header("Authorization", s"token $t")
}
val request = authorizedRequest.getOrElse(unauthorizedRequest)
val response = Future { request.asString }
// Wrap the response in an InterpretResponse message and
// forward it to the interpreter.
response.onComplete {
case Success(r) => responseInterpreter ! ResponseInterpreter.InterpretResponse(login, r)
case Failure(e) => log.warning(s"Failed to fetch $login: $e")
}
}
}
class ResponseInterpreter(extractor: ActorRef) extends Actor with ActorLogging {
def receive = {
case ResponseInterpreter.InterpretResponse(login, httpResponse) => {
val body = httpResponse.body
val followers = parse(body).asInstanceOf[JArray]
extractor ! FollowerExtractor.Extract(login, followers)
}
}
}
object ResponseInterpreter {
case class InterpretResponse(login: String, followersResponse: HttpResponse[String])
}
object FollowerExtractor {
// Messages
case class Extract(val login:String, val jsonResponse:JArray)
// Props factory method
def props = Props[FollowerExtractor]
}
class FollowerExtractor(manager: ActorRef) extends Actor with ActorLogging {
import FollowerExtractor._
val mongoClient = InsertUsers.getClient
val db = mongoClient("github")
val coll = db("graph")
coll.drop()
def receive = {
case Extract(login, followerArray) => {
val followers = extractFollowers(followerArray)
followers.foreach { follower =>
val _ = Future {
coll += DBObject("followed" -> login, "follower" -> follower)
}
manager ! FetcherManager.AddToQueue(follower)
}
}
}
def extractFollowers(followerArray:JArray): Seq[String] = for {
JObject(follower) <- followerArray
JField("login", JString(login)) <- follower
} yield login
}
class FetcherManager(router: ActorRef) extends Actor {
val added = collection.mutable.Set.empty[String]
def receive = {
case FetcherManager.AddToQueue(user) => {
if(! added.contains(user)) {
router ! Fetcher.Fetch(user)
added += user
}
}
}
}
object FetcherManager {
case class AddToQueue(user: String)
}
object GraphDemo extends App {
val system = ActorSystem("fetch_graph")
lazy val router: ActorRef = system.actorOf(RoundRobinPool(4).props(
Fetcher.props(Some("yourtokenhere"), interpreter)
))
val manager: ActorRef = system.actorOf(Props(classOf[FetcherManager], router))
val extractor: ActorRef = system.actorOf(Props(classOf[FollowerExtractor], manager))
val interpreter: ActorRef = system.actorOf(Props(classOf[ResponseInterpreter], extractor))
manager ! FetcherManager.AddToQueue("odersky")
system.scheduler.scheduleOnce(5.minutes) {
system.terminate()
}
}
It's trying to fetch users and followers from github and store the graph in mongodb. The organization of actors is like this:
I got this error:
Jun 27, 2016 7:59:21 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
Jun 27, 2016 7:59:21 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: No server chosen by PrimaryServerSelector from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, all=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out
Jun 27, 2016 7:59:21 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:1, serverValue:9}] to localhost:27017
Jun 27, 2016 7:59:21 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 7]}, minWireVersion=0, maxWireVersion=4, electionId=null, maxDocumentSize=16777216, roundTripTimeNanos=415354}
Jun 27, 2016 7:59:21 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:2, serverValue:10}] to localhost:27017
java.lang.NullPointerException
at vu.co.kaiyin.calculus.stewart.Fetcher$$anonfun$vu$co$kaiyin$calculus$stewart$Fetcher$$fetchFollowers$1.apply(GitHubGraph.scala:52)
at vu.co.kaiyin.calculus.stewart.Fetcher$$anonfun$vu$co$kaiyin$calculus$stewart$Fetcher$$fetchFollowers$1.apply(GitHubGraph.scala:51)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
After debugging I found that responseInterpreter
was not initialized (hence null).
How can this be solved?
Upvotes: 1
Views: 757
Reputation: 2659
Well, the error is pretty clearly correct -- the interpreter
hasn't been created when you're setting up the router, so it's a null pointer. Forward references are always dangerous.
There are a variety of ways to handle it; the most obvious is probably, after creating the interpreter
, you broadcast a message to all of the Fetchers
under the Router, giving the pointer to it. But it may be easier to rearrange the creation order -- create the interpreter
first, and at the end of setup send it a message with a pointer to the FetcherManager
. (Thus avoiding the need for a broadcast message.)
Either of these basically boils down to a two-phase initialization; for a loop like this, that's not terribly unusual...
Modified code:
import org.json4s.JsonAST.JValue
import org.json4s._
import org.json4s.native.JsonMethods._
import akka.actor._
import akka.routing.RoundRobinPool
import scalaj.http._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import akka.actor._
import com.mongodb.casbah.Imports._
import scalaj.http._
import scala.concurrent.Future
object Fetcher {
// message definitions
case class Fetch(val login:String)
// Props factory definitions
def props(
token:Option[String], responseInterpreter:ActorRef
):Props = Props(classOf[Fetcher], token, responseInterpreter)
}
class Fetcher(val token:Option[String], val responseInterpreter:ActorRef)
extends Actor with ActorLogging {
import Fetcher._ // import message definition
def receive = {
case Fetch(login) => fetchFollowers(login)
}
private def fetchFollowers(login:String) {
val unauthorizedRequest = Http(
s"https://api.github.com/users/$login/followers")
val authorizedRequest = token.map { t =>
unauthorizedRequest.header("Authorization", s"token $t")
}
val request = authorizedRequest.getOrElse(unauthorizedRequest)
val response = Future { request.asString }
// Wrap the response in an InterpretResponse message and
// forward it to the interpreter.
response.onComplete {
case Success(r) => responseInterpreter ! ResponseInterpreter.InterpretResponse(login, r)
case Failure(e) => log.warning(s"Failed to fetch $login: $e")
}
}
}
class ResponseInterpreter(extractor: ActorRef) extends Actor with ActorLogging {
def receive = {
case ResponseInterpreter.InterpretResponse(login, httpResponse) => {
val body = httpResponse.body
val followers = parse(body).asInstanceOf[JArray]
extractor ! FollowerExtractor.Extract(login, followers)
}
}
}
object ResponseInterpreter {
case class InterpretResponse(login: String, followersResponse: HttpResponse[String])
}
object FollowerExtractor {
// Messages
case class Extract(val login:String, val jsonResponse:JArray)
// Props factory method
def props = Props[FollowerExtractor]
}
class FollowerExtractor(manager: ActorRef) extends Actor with ActorLogging {
import FollowerExtractor._
val mongoClient = InsertUsers.getClient
val db = mongoClient("github")
val coll = db("graph")
coll.drop()
def receive = {
case Extract(login, followerArray) => {
val followers = extractFollowers(followerArray)
followers.foreach { follower =>
val _ = Future {
coll += DBObject("followed" -> login, "follower" -> follower)
}
manager ! FetcherManager.AddToQueue(follower)
}
}
}
def extractFollowers(followerArray:JArray): Seq[String] = for {
JObject(follower) <- followerArray
JField("login", JString(login)) <- follower
} yield login
}
class FetcherManager extends Actor {
private var router: ActorRef = _
val added = collection.mutable.Set.empty[String]
def receive = {
case ref: ActorRef => router = ref
case FetcherManager.AddToQueue(user) => {
if(! added.contains(user)) {
router ! Fetcher.Fetch(user)
added += user
}
}
}
}
object FetcherManager {
case class AddToQueue(user: String)
}
object GraphDemo extends App {
val system = ActorSystem("fetch_graph")
val manager: ActorRef = system.actorOf(Props(classOf[FetcherManager]))
val extractor: ActorRef = system.actorOf(Props(classOf[FollowerExtractor], manager))
val interpreter: ActorRef = system.actorOf(Props(classOf[ResponseInterpreter], extractor))
val router: ActorRef = system.actorOf(RoundRobinPool(4).props(
Fetcher.props(Some("yourtoken"), interpreter)
))
manager ! router
manager ! FetcherManager.AddToQueue("odersky")
system.scheduler.scheduleOnce(15.seconds) {
system.terminate()
}
}
Upvotes: 1