Reputation: 1
I have a Spark RDD where each element is a tuple in the form (key, input)
. I would like to use the pipe
method to pass the inputs to an external executable and generate a new RDD of the form (key, output)
. I need the keys for correlation later.
Here is an example using the spark-shell:
val data = sc.parallelize(
Seq(
("file1", "one"),
("file2", "two two"),
("file3", "three three three")))
// Incorrectly processes the data (calls toString() on each tuple)
data.pipe("wc")
// Loses the keys, generates extraneous results
data.map( elem => elem._2 ).pipe("wc")
Thanks in advance.
Upvotes: 0
Views: 1485
Reputation: 688
The solution with map is not correct as map is not guarantee to preserve partitioning so using zip after will fail. You need to use mapValues to preserve the partition of the initial RDD.
data.zip(
data.mapValues{ _.toString }.pipe("my_executable")
).map { case ((key, input), output) =>
(key, output)
}
Upvotes: 2
Reputation: 1963
Considering you cannot pass label in/out of executable, this might work:
rdd
.map(x => x._1)
.zip(rdd
.map(x => x._2)
.pipe("my executable"))
Please, be aware, that this can be fragile, and will definitely break if your executable not produces exactly single line on each input record.
Upvotes: 0