Philip K. Adetiloye
Philip K. Adetiloye

Reputation: 3270

Scala - map a value from a Map to another Map

I've a requirement to map a field in my RDD to another field from another map UserDAO.users I've tried to figure out the mapping here but can't return the username yet. I'm getting this in the updated map when I do a foreach print scala.concurrent.impl.Promise$DefaultPromise@7c4c5ddd

Here is my code snippet:

 rdd.map { l => {
      l.map { case (k, v) => {
        k match {
          case "a_userid" => {
            l.updated("a_username", userDAO.users.map(c => c.filter(f => f.userid == v.toInt)).map(y => y.map(e => e.username)))
          }
          case _ =>
            }
          }
        }
      }
    }

So basically,

rdd - RDD[Map[String, String]]

UserDAO.users - Future[Seq[User]] - where User is a case class

and returning the updated rdd - RDD[Map[String, String]]

--

Any idea how to solve this ?

Thanks

Upvotes: 0

Views: 517

Answers (1)

Aivean
Aivean

Reputation: 10882

I have rewritten your code to make it work. Please note, that it involves blocking, there is no other way to get concrete RDD[Map[String, String]] otherwise.

I ommitted rdd.map section for clarity.

First variant. I used your approach of reading users inside map. Please note, this is highly inefficient, as all users will be read each time per iteration, i.e. 11 million times:

// rdd.map ommitted
l.get("a_userid").flatMap {
  userId:String =>
    val newUserName:Option[String] =
      Await.result(userDAO.users
        .map(c => c.find(f => f.userid == userId.toInt))
        .map(y => y.map(e => e.username)),
        30 seconds
      )
    newUserName.map(l.updated("a_username", _))
}.getOrElse(l)

Alternative approach involves reading users to the map beforehand. That map then will be broadcasted to all spark workers. As your map is not that big, it's fine. This approach is more efficient, as you perform only single map lookup per iteration over RDD, which is fast.

val users:Map[Int, String] =  Await.result(userDAO.users
  .map(uss => uss.map(u => u.userid -> u.username).toMap),
  30 seconds
)

// rdd.map ommitted
l.get("a_userid").flatMap {
  userId:String =>
    users.get(userId.toInt).map(l.updated("a_username", _))
}.getOrElse(l)

UPD: Just for the sake of completeness, here is another asynchronous variant:

userDAO.users
  .map(uss => uss.map(u => u.userid -> u.username).toMap)
  .map { users:Map[Int, String] =>
      rdd.map { l:Map[String, String] =>
        l.get("a_userid").flatMap {
          userId:String =>
            users.get(userId.toInt).map(l.updated("a_username", _))
        }.getOrElse(l)
      }
  }

It follows the same approach as variant2, but returns Future[RDD[Map[String, String]]] instead of concrete result.

Upvotes: 1

Related Questions