Reputation: 878
I have two files.
1st file contains: (assume apple is key and fruit is value)
apple fruit
banana fruit
tomato vegetable
mango fruit
potato vegetable
2nd file contains:
apple
banana
mango
potato
tomato
I need to loop through 2nd file and find matching value in file 1. I need final output as: (fruit is key and apple,banana.... are values)
fruit apple,banana,mango
vegetable potato,tomato
Please suggest me the best and optimized way to do this in spark and scala.
Upvotes: 0
Views: 257
Reputation: 41769
val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sc= new SparkContext(conf)
val one = List(("apple","fruit"), ("banana","fruit"), ("tomato","vegetable"),
("mango", "fruit"), ("potato","vegetable"))
val oneRdd = sc.makeRDD(one, 1)
//maybe a Broadcast for this
val two = List("apple", "banana", "tomato", "mango", "potato")
val res = oneRdd.filter(two contains _._1).map(t=>(t._2,List(t._1))).reduceByKey{_++_}
EDIT: and a version that works entirely with RDDs so file1 and file2 can be arbitarily big (although if file2 is big, it probably contains duplicates, so you might want a .distinct
every time in the reduceByKey
)
val oneRdd = sc.makeRDD(one, 1)
val twoRdd = sc.makeRDD(two, 1).map(a=>(a, a)) // to make a PairRDD
val res = oneRdd.join(twoRdd).map{case(k,(v1, v2))=>(v1, List(k))}.reduceByKey{_++_}
Output of either is the same:
(vegetable,List(potato, tomato))(fruit,List(banana, apple, mango))
Upvotes: 2
Reputation: 3890
val inputRDD1 = sc.textFile("file1.txt").map(r=> {
val arr = r.split(" ")
(arr(0), arr(1))
})
val inputRDD2 = sc.textFile("file2.txt")
val broadcastRDD = sc.broadcast(inputRDD1.collect.toList.toMap)
val interRDD = inputRDD2.map(r => (broadcastRDD.value.get(r), r))
val outputRDD = interRDD.groupByKey
Output
res16: Array[(String, Iterable[String])] = Array((fruit,CompactBuffer(apple, banana, mango)), (vegetable,CompactBuffer(potato, tomato)))
Upvotes: 1
Reputation: 1257
>>> d=[('apple','fruit'),('banana','fruit'),('tomato','veg'),('mango','fruit'),(
'potato','veg')]
>>> r = sc.parallelize(d)
>>> r1=r.map(lambda x: (x[1],x[0])).groupByKey()
>>> for i in r1.collect():
... print "%s %s" %(i[0],list(i[1]))
veg ['tomato', 'potato']
fruit ['apple', 'banana', 'mango']
Upvotes: 0