swamoch
swamoch

Reputation: 300

How to filter an rdd by data type?

I have an rdd that i am trying to filter for only float type. Do Spark rdds provide any way of doing this?

I have a csv where I need only float values greater than 40 into a new rdd. To achieve this, i am checking if it is an instance of type float and filtering them. When I filter with a !, all the strings are still there in the output and when i dont use !, the output is empty.

val airports1 = airports.filter(line => !line.split(",")(6).isInstanceOf[Float])
val airports2 = airports1.filter(line => line.split(",")(6).toFloat > 40)

At the .toFloat , i run into NumberFormatException which I've tried to handle in a try catch block.

Upvotes: 1

Views: 1050

Answers (1)

Since you have a plain string and you are trying to get float values from it, you are not actually filtering by type. But, if they can be parsed to float instead.
You can accomplish that using a flatMap together with Option.

import org.apache.spark.sql.SparkSession
import scala.util.Try

val spark = SparkSession.builder.master("local[*]").appName("Float caster").getOrCreate()
val sc = spark.sparkContext

val data = List("x,10", "y,3.3", "z,a")
val rdd = sc.parallelize(data) // rdd: RDD[String]
val filtered = rdd.flatMap(line => Try(line.split(",")(1).toFloat).toOption) // filtered: RDD[Float]
filtered.collect() // res0: Array[Float] = Array(10.0, 3.3)

For the > 40 part you can either, perform another filter after or filter the inner Option.
(Both should perform more or less equals due spark laziness, thus choose the one is more clear for you).

// Option 1 - Another filter.
val filtered2 = filtered.filter(x => x > 40)

// Option 2 - Filter the inner option in one step.
val filtered = rdd.flatMap(line => Try(line.split(",")(1).toFloat).toOption.filter(x => x > 40))

Let me know if you have any question.

Upvotes: 3

Related Questions