HackerDuck
HackerDuck

Reputation: 269

Getting List[Serializable] instead of List[(String,String)]

My initial RDD looks as follows RDD[(String, List[(String,String)])]:

(600,List((22,33),(55,88)))
(700,List((12,13),(15,18),(18,88)))

I want to append each entry with additional data obtained from Redis cache DB. To do this, I use Sedis which is a wrapper of Jedis for Scala. This is a fragment of my code:

import org.sedis._
import redis.clients.jedis._

val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))

val appended = filtered.map({line => (line._1,
    redisPool.withJedisClient { client =>
    val additionalData: List[String] = Dress.up(client).hvals("member_id:"+line._1)
    line._2.union(additionalData)
    })
})

The problem is that appended is of the format RDD[(String, List[Serializable] instead of RDD[(String, List[(String,String)])]. What am I doing wrong? Also, is the way in which I am using redisPool inside map efficient enough or is there any other better option?

Upvotes: 1

Views: 2029

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37842

  1. line._2.union(additionalData) creates a union of line._2 which has the type List[(String, String)] and of additionalData which has the type List[Sting]. The result must be the most accurate common type of these two different types - which is List[Serializable]. If additionalData had the type List[(String, String)], that would have been the result type.

  2. As for efficiency of JedisPool usage: usually, when opening connection to some external resource from a Spark transformation, you should use mapPartitions, which executes the given function on each of the RDD's partitions. Why? Under your current implementation, the pool is created on the driver application, then serialized and shipped to each executor, to be deserialized and used in the mapping. This usually fails, because such a pool holds some kind of connection (open socket maybe) that doesn't exist on the executors, only on the driver where it was created. One (inefficient) alternative would be to create the pool inside the map function (per record). The better option is to use mapPartitions:

    val appended = filtered.mapPartitions(iter => {
      val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "jedis-host", 6379, 2000))
    
      iter.map({line => (line._1,
        redisPool.withJedisClient { client =>
        val additionalData: List[String] = Dress.up(client).hvals("member_id:"+line._1)
        line._2.union(additionalData)
        })
      })
      // close the pool, if relevant
    })
    

Upvotes: 2

Related Questions