Reputation: 886
I have a rest controller which calls a service, which in turn calls an actor to get a query from a simulated database. The message makes its way to the actor, but the app crashes before the actor can respond, and there is a null pointer exception coming from the actor. I'm using akka http for the controller and routing directives to compose the response. These are my dependencies:
"com.typesafe.akka" %% "akka-http" % "10.1.8",
"com.typesafe.akka" %% "akka-actor" % "2.5.22",
"com.typesafe.akka" %% "akka-stream" % "2.5.22",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8"
class CacheActor extends Actor {
val tweetRepositoryInMemory: TweetRepositoryInMemory = new TweetRepositoryInMemory()
val log = Logging(context.system, this)
var tweetMap: scala.collection.mutable.Map[String, List[String]] =
scala.collection.mutable.Map[String, List[String]]()
// consult the in-memory map, if the username is not found, call the repository, update the map, and return the tweets
def queryRepo(username: String): Future[Option[List[String]]] = {
if (tweetMap isDefinedAt username) {
return Future(tweetMap.get(username))
} else {
var listOfTweetTexts: List[String] = List[String]()
val queryLimit = 10
val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
onComplete(resultTweets) {
case Success(tweets) =>
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
case Failure(t) =>
log.error("An error has occurred: " + t.getMessage)
return null
}
}
return null
}
def receive = {
case message: TweetQuery => // .take(message.limit)
val queryResult: Future[Option[List[String]]] = queryRepo(message.userName)
queryResult onComplete {
case Success(result) => sender() ! result
case Failure(t) => log.error("An error has occurred: " + t.getMessage)
}
}
}
Upvotes: 0
Views: 836
Reputation: 7162
A stacktrace would have been helpful, but I suspect that this line here in your receive
causes the NPE:
queryResult onComplete {
Your queryRepo
function returns null
if tweetMap
is not defined at username
.
UPDATE
And here is why:
The queryRepo
function returns a Furture[Seq[String]]
in exactly one case
if (tweetMap isDefinedAt username) {
return Future(tweetMap.get(username))
}
In the else block you create a Future[Seq[String]]
val resultTweets: Future[Seq[String]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
but you never return that. Instead you pass a callback to the Future that is executed asynchronously when the futures completes, hence onComplete
. (I noticed that you do not call onComplete
on the Future
directly but a function onComplete
that takes the future and a partial function as an argument and I am assuming that that function registers the regular onComplete
callback.)
Thus the result of the else block of your if statement is Unit
and not Future[Seq[String]]
The code
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
is most likely executed after queryRepo
has already returned null
.
def queryRepo(username: String): Future[Option[List[String]]] = {
if (tweetMap isDefinedAt username) {
...
} else {
...
}
return null // <--- here
}
End UPDATE
If you change the following lines:
val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
onComplete(resultTweets) {
case Success(tweets) =>
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
case Failure(t) =>
log.error("An error has occurred: " + t.getMessage)
return null
}
to:
tweetRepositoryInMemory.searchByUserName(username, queryLimit).map { tweets =>
// NOTE: This happens asynchronously in the `Future`. IT is better not to close over local variables
val listOfTweetTexts = for (tweet <- tweets) yield { tweet.text }
// again, access to an actor member from within a `Future` is not wise or rather a bug in the making.
// But I will not refactor that much here. The way to do this would be to send a message to self and process the mutable member within `receive`
tweetMap(username) = listOfTweetTexts
Option(listOfTweetTexts)
}
The NullPointerException
should no longer occur.
However, my impression is that you could use some more training with futures, actors and functional programming in scala in general.
For example,
receive
and not in an asynchronous Future
or Thread
null
. Upvotes: 1