jollybugbear
jollybugbear

Reputation: 1

Spark RDD pipe value from tuple

I have a Spark RDD where each element is a tuple in the form (key, input). I would like to use the pipe method to pass the inputs to an external executable and generate a new RDD of the form (key, output). I need the keys for correlation later.

Here is an example using the spark-shell:

val data = sc.parallelize(
  Seq(
    ("file1", "one"),
    ("file2", "two two"),
    ("file3", "three three three")))

// Incorrectly processes the data (calls toString() on each tuple)
data.pipe("wc")

// Loses the keys, generates extraneous results
data.map( elem => elem._2 ).pipe("wc")

Thanks in advance.

Upvotes: 0

Views: 1485

Answers (2)

geoalgo
geoalgo

Reputation: 688

The solution with map is not correct as map is not guarantee to preserve partitioning so using zip after will fail. You need to use mapValues to preserve the partition of the initial RDD.

data.zip( 
  data.mapValues{ _.toString }.pipe("my_executable")
).map { case ((key, input), output) => 
  (key, output)
}

Upvotes: 2

Ivan Balashov
Ivan Balashov

Reputation: 1963

Considering you cannot pass label in/out of executable, this might work:

rdd
  .map(x => x._1)
  .zip(rdd
          .map(x => x._2)
          .pipe("my executable"))

Please, be aware, that this can be fragile, and will definitely break if your executable not produces exactly single line on each input record.

Upvotes: 0

Related Questions