YQ.Wang
YQ.Wang

Reputation: 1177

How can I skip some rows when reading from local file using pyspark session?

I'm using pyspark to read and process some data from local .plt files. Here is what the file looks like:

Geolife trajectory
WGS 84
Altitude is in Feet
Reserved 3
0,2,255,My Track,0,0,2,8421376
0
39.984094,116.319236,0,492,39744.2451967593,2008-10-23,05:53:05
39.984198,116.319322,0,492,39744.2452083333,2008-10-23,05:53:06
39.984224,116.319402,0,492,39744.2452662037,2008-10-23,05:53:11
39.984211,116.319389,0,492,39744.2453240741,2008-10-23,05:53:16
......

As is shown above, I'm not interested in the beginning 6 rows, what I want are the rows which start from the 7th row. So I want to use spark session to read this file from the 7th row. Here is the code I've tried but failed:

from pyspark.sql import SparkSession
session = SparkSession.builder.appName('file reader').master('local[*]').getOrCreate()
df = session.read.\
     option('delimiter', ',').\
     option('header', 'false').\
     csv('test.plt')
df.show()

Could somebody give me some advice? Thank you for your attention.

Upvotes: 2

Views: 3253

Answers (3)

Dave Canton
Dave Canton

Reputation: 178

Assuming that of data from the 7th line onwards follows pattern in which you have shown:

from pyspark.sql import SparkSession
session = SparkSession.builder.appName('file reader').master('local[*]').getOrCreate()
data = session.read.textFile('test.plt')

header = data.head(6)  # the first six rows

filtered = data.filter(row => row != header)
               .withColumn("a", split(col("value"), ",").getItem(0))
               .withColumn("b", split(col("value"), ",").getItem(1))
               .withColumn("c", split(col("value"), ",").getItem(2))
               .withColumn("d", split(col("value"), ",").getItem(3))
               .withColumn("e", split(col("value"), ",").getItem(4))
               .withColumn("f", split(col("value"), ",").getItem(5))
               .withColumn("g", split(col("value"), ",").getItem(6))
               .drop("value")

Upvotes: 0

cph_sto
cph_sto

Reputation: 7597

In addition, to the great method suggested by @Arnon Rotem-Gal-Oz, we can also exploit some special property of any column, if there is a one present.

In YQ. Wang's data, we can see the 6th column is a date, and the chances are pretty negligible that the 6th column in the header will also be a date. So, the ideas is to check for this special property for the 6th column. to_date() converts a string to date. If this column is not date, then to_date() will return Null and we will filter out all such rows using .where() clause -

from pyspark.sql.functions import to_date
from pyspark.sql.types import FloatType, StringType, StructType, StructField
df = spark.read.schema(schema)\
                    .format("csv")\
                    .option("header","false")\
                    .option("sep",',')\
                    .load('test.plt')\
                    .where(to_date(col('f'),'yyyy-MM-dd').isNotNull())
df.show()
+---------+----------+----+---+---------+----------+--------+
|        a|         b|   c|  d|        e|         f|       g|
+---------+----------+----+---+---------+----------+--------+
|39.984093| 116.31924|   0|492|39744.246|2008-10-23|05:53:05|
|  39.9842| 116.31932|   0|492|39744.246|2008-10-23|05:53:06|
|39.984222|116.319405|   0|492|39744.246|2008-10-23|05:53:11|
| 39.98421| 116.31939|   0|492|39744.246|2008-10-23|05:53:16|
+---------+----------+----+---+---------+----------+--------+

There are downsides to this method too, like if the date was missing, then the whole row gets filtered out.

Upvotes: 1

Arnon Rotem-Gal-Oz
Arnon Rotem-Gal-Oz

Reputation: 25919

from pyspark.sql.types import *
from pyspark.sql import SparkSession
session = SparkSession.builder.appName('file reader').master('local[*]').getOrCreate()
schema = StructType([StructField("a", FloatType()),
                     StructField("b", FloatType()),
                     StructField("c", IntegerType()),
                     StructField("d", IntegerType()),
                     StructField("e", FloatType()),
                     StructField("f", StringType()),
                     StructField("g", StringType())])
df=session.read.option('mode','DROPMALFORMED').csv('test.plt',schema)

Upvotes: 4

Related Questions