Reputation: 47
I'm fairly new to Scala and Spark and functional programming in general, so forgive me if this is a pretty basic question.
I'm merging two CSV files, so I got a lot of inspiration from this: Merge the intersection of two CSV files with Scala
although that is just Scala code and I wanted to write it in Spark to handle much larger CSV files.
This part of the code I think I've got right:
val csv1 = sc.textFile(Csv1Location).cache()
val csv2 = sc.textFile(Csv2Location).cache()
def GetInput1Key(input: String): Key = Key(getAtIndex(input.split(SplitByCommas, -1), Csv1KeyLocation))
def GetInput2Key(input: String): Key = Key(getAtIndex(input.split(SplitByCommas, -1), Csv2KeyLocation))
val intersectionOfKeys = csv1 map GetInput1Key intersection(csv2 map GetInput2Key)
val map1 = csv1 map (input => GetInput1Key(input) -> input)
val map2 = csv2 map (input => GetInput2Key(input) -> input)
val broadcastedIntersection = sc.broadcast(intersectionOfKeys.collect.toSet)
And this is where I'm a little lost. I have a set of keys (intersectionOfKeys) that are present in both of my RDDs, and I have two RDDs that contain [Key, String] maps. If they were plain maps I could just do:
val output = broadcastedIntersection.value map (key => map1(key) + ", " + map2(key))
but that syntax isn't working.
Please let me know if you need any more information about the CSV files or what I'm trying to accomplish. Also, I'd love any syntactical and/or idiomatic comments on my code as well if you all notice anything incorrect.
Update:
val csv1 = sc.textFile(Csv1Location).cache()
val csv2 = sc.textFile(Csv2Location).cache()
def GetInput1Key(input: String): Key = Key(getAtIndex(input.split(SplitByCommas, -1), Csv1KeyLocation))
def GetInput2Key(input: String): Key = Key(getAtIndex(input.split(SplitByCommas, -1), Csv2KeyLocation))
val intersectionOfKeys = csv1 map GetInput1Key intersection(csv2 map GetInput2Key)
val map1 = csv1 map (input => GetInput1Key(input) -> input)
val map2 = csv2 map (input => GetInput2Key(input) -> input)
val intersections = map1.join(map2)
intersections take NumOutputs foreach println
This code worked and did what I needed to do, but I was wondering if there were any modifications or performance implications of using join. I remember reading somewhere that join is typically really expensive and time consuming because all the data needs to be sent to all the distributed workers.
Upvotes: 1
Views: 1904
Reputation: 67135
I think hveiga is correct, a join would be simpler:
val csv1KV = csv1.map(line=>(GetInput1Key(line), line))
val csv2KV = csv2.map(line=>(GetInput2Key(line), line))
val joined = csv1KV join csv2KV
joined.mapValues(lineTuple = lineTuple._1 + ", " lineTuple._2
This is more performant AND readable as far as I can see as you would need to join the two sets together at some point, and your way relies on a single machine mentality where you would have to pull each collection in to make sure you are requesting the line from all partitions. Note that I used mapValues
, which at least keeps your sets hash partitioned and cuts down on network noise.
Upvotes: 3