Reputation: 269
My initial RDD looks as follows RDD[(String, List[(String,String)])]
:
(600,List((22,33),(55,88)))
(700,List((12,13),(15,18),(18,88)))
I want to append each entry with additional data obtained from Redis cache DB. To do this, I use Sedis
which is a wrapper of Jedis
for Scala. This is a fragment of my code:
import org.sedis._
import redis.clients.jedis._
val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
val appended = filtered.map({line => (line._1,
redisPool.withJedisClient { client =>
val additionalData: List[String] = Dress.up(client).hvals("member_id:"+line._1)
line._2.union(additionalData)
})
})
The problem is that appended
is of the format RDD[(String, List[Serializable]
instead of RDD[(String, List[(String,String)])]
. What am I doing wrong?
Also, is the way in which I am using redisPool
inside map
efficient enough or is there any other better option?
Upvotes: 1
Views: 2029
Reputation: 37842
line._2.union(additionalData)
creates a union of line._2
which has the type List[(String, String)]
and of additionalData
which has the type List[Sting]
. The result must be the most accurate common type of these two different types - which is List[Serializable]
.
If additionalData
had the type List[(String, String)]
, that would have been the result type.
As for efficiency of JedisPool usage: usually, when opening connection to some external resource from a Spark transformation, you should use mapPartitions
, which executes the given function on each of the RDD's partitions. Why? Under your current implementation, the pool is created on the driver application, then serialized and shipped to each executor, to be deserialized and used in the mapping. This usually fails, because such a pool holds some kind of connection (open socket maybe) that doesn't exist on the executors, only on the driver where it was created. One (inefficient) alternative would be to create the pool inside the map function (per record). The better option is to use mapPartitions:
val appended = filtered.mapPartitions(iter => {
val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "jedis-host", 6379, 2000))
iter.map({line => (line._1,
redisPool.withJedisClient { client =>
val additionalData: List[String] = Dress.up(client).hvals("member_id:"+line._1)
line._2.union(additionalData)
})
})
// close the pool, if relevant
})
Upvotes: 2