VictorGram
VictorGram

Reputation: 2661

Spark , Scala: How to remove empty lines either from Rdd or from dataframe?

I am using spark on scala.And I have some empty rows in Rdd. I need to remove them from the Rdd.

And I tried it as :

val valfilteredRow = rddRow.filter(row => row!=null && row.length>0)

However it did not work.

The rows in Rdd looks like [ with : valfilteredRow.collect().foreach(println) ]:

[,AAGGOO]
[,AAAOOO]
[,GGGGGII]
[]
[,UGGG]

Upvotes: 1

Views: 7516

Answers (3)

vikrant rana
vikrant rana

Reputation: 4689

I don't know Scala but here is what I did in Pyspark:

Suppose you have an input file like:

Banana,23,Male,5,11,2017

Dragon,28,Male,1,11,2017
Dragon,28,Male,1,11,2017

2nd line is empty.

rdd = sc.textFile(PATH_TO_FILE).mapPartitions(lambda line: csv.reader(line,delimiter=','))

>>> rdd.take(10)
[['Banana', '23', 'Male', '5', '11', '2017'], [], ['Dragon', '28', 'Male', '1', '11', '2017'], ['Dragon', '28', 'Male', '1', '11', '2017']]

you can see that second element is empty, so we will filter it by calculating the length of element, which should be greater than one.

>>> rdd = sc.textFile(PATH_TO_FILE).mapPartitions(lambda line: csv.reader(line,delimiter=',')).filter(lambda line: len(line) > 1)
>>> rdd.take(10)
[['Banana', '23', 'Male', '5', '11', '2017'], ['Dragon', '28', 'Male', '1', '11', '2017'], ['Dragon', '28', 'Male', '1', '11', '2017']]

Upvotes: 2

Manoj Kumar Dhakad
Manoj Kumar Dhakad

Reputation: 1892

If your RDD is of type RDD[String] then you can do like

rdd.filter(_.length>0).collect

Upvotes: 2

Driss NEJJAR
Driss NEJJAR

Reputation: 978

Suppose you have the following sequence :

val seq = Seq(
  ",AAGGOO",
  ",AAAOOO",
  ",GGGGGII",
  "",
  ",UGGG"
)

With DF

  val df = seq.toDF("Column_name")

  df.show(false)

+--------------+
|Column_name   |
+--------------+
|,AAGGOO       |
|,AAAOOO       |
|,GGGGGII      |
|              |
|,UGGG         |
+--------------+

  df.filter(row => !(row.mkString("").isEmpty && row.length>0)).show(false)

+--------------+
|Column_name   |
+--------------+
|,AAGGOO       |
|,AAAOOO       |
|,GGGGGII      |
|,UGGG         |
+--------------+

With rdd

  val rdd = sc.parallelize(seq)

  val filteredRdd = rdd.filter(row => !row.isEmpty)

  filteredRdd.foreach(println)

,AAGGOO
,AAAOOO
,GGGGGII
,UGGG

Upvotes: 1

Related Questions