Reputation: 2661
I am using spark on scala.And I have some empty rows in Rdd. I need to remove them from the Rdd.
And I tried it as :
val valfilteredRow = rddRow.filter(row => row!=null && row.length>0)
However it did not work.
The rows in Rdd looks like [ with : valfilteredRow.collect().foreach(println) ]:
[,AAGGOO]
[,AAAOOO]
[,GGGGGII]
[]
[,UGGG]
Upvotes: 1
Views: 7516
Reputation: 4689
I don't know Scala but here is what I did in Pyspark:
Suppose you have an input file like:
Banana,23,Male,5,11,2017
Dragon,28,Male,1,11,2017
Dragon,28,Male,1,11,2017
2nd line is empty.
rdd = sc.textFile(PATH_TO_FILE).mapPartitions(lambda line: csv.reader(line,delimiter=','))
>>> rdd.take(10)
[['Banana', '23', 'Male', '5', '11', '2017'], [], ['Dragon', '28', 'Male', '1', '11', '2017'], ['Dragon', '28', 'Male', '1', '11', '2017']]
you can see that second element is empty, so we will filter it by calculating the length of element, which should be greater than one.
>>> rdd = sc.textFile(PATH_TO_FILE).mapPartitions(lambda line: csv.reader(line,delimiter=',')).filter(lambda line: len(line) > 1)
>>> rdd.take(10)
[['Banana', '23', 'Male', '5', '11', '2017'], ['Dragon', '28', 'Male', '1', '11', '2017'], ['Dragon', '28', 'Male', '1', '11', '2017']]
Upvotes: 2
Reputation: 1892
If your RDD
is of type RDD[String]
then you can do like
rdd.filter(_.length>0).collect
Upvotes: 2
Reputation: 978
Suppose you have the following sequence :
val seq = Seq(
",AAGGOO",
",AAAOOO",
",GGGGGII",
"",
",UGGG"
)
With DF
val df = seq.toDF("Column_name")
df.show(false)
+--------------+
|Column_name |
+--------------+
|,AAGGOO |
|,AAAOOO |
|,GGGGGII |
| |
|,UGGG |
+--------------+
df.filter(row => !(row.mkString("").isEmpty && row.length>0)).show(false)
+--------------+
|Column_name |
+--------------+
|,AAGGOO |
|,AAAOOO |
|,GGGGGII |
|,UGGG |
+--------------+
With rdd
val rdd = sc.parallelize(seq)
val filteredRdd = rdd.filter(row => !row.isEmpty)
filteredRdd.foreach(println)
,AAGGOO
,AAAOOO
,GGGGGII
,UGGG
Upvotes: 1