Reputation: 3270
I've a requirement to map a field in my RDD to another field from another map UserDAO.users
I've tried to figure out the mapping here but can't return the username
yet. I'm getting this in the updated map when I do a foreach print scala.concurrent.impl.Promise$DefaultPromise@7c4c5ddd
Here is my code snippet:
rdd.map { l => {
l.map { case (k, v) => {
k match {
case "a_userid" => {
l.updated("a_username", userDAO.users.map(c => c.filter(f => f.userid == v.toInt)).map(y => y.map(e => e.username)))
}
case _ =>
}
}
}
}
}
So basically,
rdd
- RDD[Map[String, String]]
UserDAO.users - Future[Seq[User]]
- where User is a case class
and returning the updated rdd
- RDD[Map[String, String]]
--
Any idea how to solve this ?
Thanks
Upvotes: 0
Views: 517
Reputation: 10882
I have rewritten your code to make it work. Please note, that it involves blocking, there is no other way to get concrete RDD[Map[String, String]]
otherwise.
I ommitted rdd.map
section for clarity.
First variant. I used your approach of reading users inside map
. Please note, this is highly inefficient, as all users will be read each time per iteration, i.e. 11 million times:
// rdd.map ommitted
l.get("a_userid").flatMap {
userId:String =>
val newUserName:Option[String] =
Await.result(userDAO.users
.map(c => c.find(f => f.userid == userId.toInt))
.map(y => y.map(e => e.username)),
30 seconds
)
newUserName.map(l.updated("a_username", _))
}.getOrElse(l)
Alternative approach involves reading users to the map beforehand. That map then will be broadcasted to all spark workers. As your map is not that big, it's fine. This approach is more efficient, as you perform only single map lookup per iteration over RDD
, which is fast.
val users:Map[Int, String] = Await.result(userDAO.users
.map(uss => uss.map(u => u.userid -> u.username).toMap),
30 seconds
)
// rdd.map ommitted
l.get("a_userid").flatMap {
userId:String =>
users.get(userId.toInt).map(l.updated("a_username", _))
}.getOrElse(l)
UPD: Just for the sake of completeness, here is another asynchronous variant:
userDAO.users
.map(uss => uss.map(u => u.userid -> u.username).toMap)
.map { users:Map[Int, String] =>
rdd.map { l:Map[String, String] =>
l.get("a_userid").flatMap {
userId:String =>
users.get(userId.toInt).map(l.updated("a_username", _))
}.getOrElse(l)
}
}
It follows the same approach as variant2, but returns Future[RDD[Map[String, String]]]
instead of concrete result.
Upvotes: 1