Vamsi
Vamsi

Reputation: 878

Key value mapping in spark

I have two files.

1st file contains: (assume apple is key and fruit is value)

apple   fruit
banana  fruit
tomato  vegetable
mango   fruit
potato  vegetable

2nd file contains:

apple
banana
mango
potato
tomato

I need to loop through 2nd file and find matching value in file 1. I need final output as: (fruit is key and apple,banana.... are values)

fruit    apple,banana,mango
vegetable    potato,tomato

Please suggest me the best and optimized way to do this in spark and scala.

Upvotes: 0

Views: 257

Answers (3)

The Archetypal Paul
The Archetypal Paul

Reputation: 41769

val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sc= new SparkContext(conf)

val one = List(("apple","fruit"), ("banana","fruit"), ("tomato","vegetable"),
               ("mango", "fruit"), ("potato","vegetable"))
val oneRdd = sc.makeRDD(one, 1)

//maybe a Broadcast for this
val two = List("apple", "banana", "tomato", "mango", "potato")

val res = oneRdd.filter(two contains _._1).map(t=>(t._2,List(t._1))).reduceByKey{_++_}

EDIT: and a version that works entirely with RDDs so file1 and file2 can be arbitarily big (although if file2 is big, it probably contains duplicates, so you might want a .distinct every time in the reduceByKey)

val oneRdd = sc.makeRDD(one, 1)

val twoRdd = sc.makeRDD(two, 1).map(a=>(a, a)) // to make a PairRDD

val res = oneRdd.join(twoRdd).map{case(k,(v1,  v2))=>(v1, List(k))}.reduceByKey{_++_}

Output of either is the same:

(vegetable,List(potato, tomato))(fruit,List(banana, apple, mango))

Upvotes: 2

banjara
banjara

Reputation: 3890

val inputRDD1 = sc.textFile("file1.txt").map(r=> {
    val arr = r.split(" ")
    (arr(0), arr(1))
})

val inputRDD2 = sc.textFile("file2.txt")

val broadcastRDD = sc.broadcast(inputRDD1.collect.toList.toMap)

val interRDD = inputRDD2.map(r => (broadcastRDD.value.get(r), r))

val outputRDD = interRDD.groupByKey

Output

res16: Array[(String, Iterable[String])] = Array((fruit,CompactBuffer(apple, banana, mango)), (vegetable,CompactBuffer(potato, tomato)))

Upvotes: 1

ayan guha
ayan guha

Reputation: 1257

>>> d=[('apple','fruit'),('banana','fruit'),('tomato','veg'),('mango','fruit'),(
'potato','veg')]
>>> r = sc.parallelize(d)
>>> r1=r.map(lambda x: (x[1],x[0])).groupByKey()
>>> for i in r1.collect():
...     print "%s  %s" %(i[0],list(i[1]))

veg  ['tomato', 'potato']
fruit  ['apple', 'banana', 'mango']

Upvotes: 0

Related Questions