theoutlaw
theoutlaw

Reputation: 79

How to pass data as a tuple into an rdd in Spark using Scala

I have set of coordinates (x, y) as my data in a csv file. I want to pass these x and y into an RDD[(Double, Double)] as tuples and name it points. I have tried the following but for some reason I get an error saying. "Constructor cannot be instantiated to expected type, found: Array[T], required: String".

// Load the data
val data = sc.textFile("data.csv")

// Read the data as an RDD[(Double, Double)]
val points = data.map(line => line.split(",").map{ case Array(x, y) => (x.toDouble, y.toDouble)} )

EDIT : Is there any way I can filter these points so that I can handle values that are null (if x or y or both are null in the dataset)? In essence I want to check if the tuple always contains 2 elements. I tried something like this

val points = data.map(line => line.split(",").filter(!_.isEmpty)).map{ case Array(x, y) => (x.toDouble, y.toDouble)}.filter(_.size > 1)

but I get an error Type mismatch, expected: (Double, Double) => Boolean, actual: (Double, Double) => Any

Upvotes: 2

Views: 2622

Answers (3)

Md Shihab Uddin
Md Shihab Uddin

Reputation: 561

Apache spark has api to read csv file. I prefer to use the api rather that using textFile to read csv file as it handles missing values or null internally. Here is the content of my data.csv file:

12,13
12.3,25.6
12.4
,34.5

The desired output can be generated by following way:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}

val scheam = StructType(Array(
      StructField("x",DoubleType,true),
      StructField("y",DoubleType,true)
    ))
val data_df = spark.read.schema(scheam).csv("data.csv")
data_df.show()
+----+----+
|   x|   y|
+----+----+
|12.0|13.0|
|12.3|25.6|
|12.4|null|
|null|34.5|
//converting the data_df dataframe to RDD[Double,Double]
val points_rdd = data_df.rdd.map{case Row(x:Double,y:Double) => (x,y)}

Handling null:

val filterd_data_df = data_df.filter(data_df("x").isNotNull && data_df("y").isNotNull).
                rdd.map{case Row(x:Double,y:Double) => (x,y)}
import spark.implicits._
filterd_data_df.toDF("x", "y").show()
+----+----+
|   x|   y|
+----+----+
|12.0|13.0|
|12.3|25.6|
+----+----+

Upvotes: 0

Raphael Roth
Raphael Roth

Reputation: 27373

Your approach is almost correct, but you should use :

val points = data.map(line => {
  val Array(x, y) = line.split(",")
  (x.toDouble, y.toDouble)
})

Or alternatively:

val points = data.map(line => {
  line.split(",") match {
    case Array(x, y) => (x.toDouble, y.toDouble)
  }
})

The problem of your approach is that you call map on line.split(","), i.e. you call map on Àrray[String], so you try to (pattern) match a String with Array(x,y)

Upvotes: 1

Rishi Saraf
Rishi Saraf

Reputation: 1812

Use below code. You have to call second map on output of split i.e list of array

// Load the data
      val data = sc.textFile("data.csv")

      // Read the data as an RDD[(Double, Double)]
      val points = data.map(line => line.split(",")).map{ case Array(x, y) => (x.toDouble, y.toDouble)}

Upvotes: 3

Related Questions