Reputation: 1177
I'm using pyspark to read and process some data from local .plt
files. Here is what the file looks like:
Geolife trajectory
WGS 84
Altitude is in Feet
Reserved 3
0,2,255,My Track,0,0,2,8421376
0
39.984094,116.319236,0,492,39744.2451967593,2008-10-23,05:53:05
39.984198,116.319322,0,492,39744.2452083333,2008-10-23,05:53:06
39.984224,116.319402,0,492,39744.2452662037,2008-10-23,05:53:11
39.984211,116.319389,0,492,39744.2453240741,2008-10-23,05:53:16
......
As is shown above, I'm not interested in the beginning 6 rows, what I want are the rows which start from the 7th row. So I want to use spark session to read this file from the 7th row. Here is the code I've tried but failed:
from pyspark.sql import SparkSession
session = SparkSession.builder.appName('file reader').master('local[*]').getOrCreate()
df = session.read.\
option('delimiter', ',').\
option('header', 'false').\
csv('test.plt')
df.show()
Could somebody give me some advice? Thank you for your attention.
Upvotes: 2
Views: 3253
Reputation: 178
Assuming that of data from the 7th line onwards follows pattern in which you have shown:
from pyspark.sql import SparkSession
session = SparkSession.builder.appName('file reader').master('local[*]').getOrCreate()
data = session.read.textFile('test.plt')
header = data.head(6) # the first six rows
filtered = data.filter(row => row != header)
.withColumn("a", split(col("value"), ",").getItem(0))
.withColumn("b", split(col("value"), ",").getItem(1))
.withColumn("c", split(col("value"), ",").getItem(2))
.withColumn("d", split(col("value"), ",").getItem(3))
.withColumn("e", split(col("value"), ",").getItem(4))
.withColumn("f", split(col("value"), ",").getItem(5))
.withColumn("g", split(col("value"), ",").getItem(6))
.drop("value")
Upvotes: 0
Reputation: 7597
In addition, to the great method suggested by @Arnon Rotem-Gal-Oz
, we can also exploit some special property of any column, if there is a one present.
In YQ. Wang's
data, we can see the 6th
column is a date, and the chances are pretty negligible that the 6th
column in the header
will also be a date
. So, the ideas is to check for this special property for the 6th
column. to_date()
converts a string
to date
. If this column is not date
, then to_date()
will return Null
and we will filter out all such rows using .where()
clause -
from pyspark.sql.functions import to_date
from pyspark.sql.types import FloatType, StringType, StructType, StructField
df = spark.read.schema(schema)\
.format("csv")\
.option("header","false")\
.option("sep",',')\
.load('test.plt')\
.where(to_date(col('f'),'yyyy-MM-dd').isNotNull())
df.show()
+---------+----------+----+---+---------+----------+--------+
| a| b| c| d| e| f| g|
+---------+----------+----+---+---------+----------+--------+
|39.984093| 116.31924| 0|492|39744.246|2008-10-23|05:53:05|
| 39.9842| 116.31932| 0|492|39744.246|2008-10-23|05:53:06|
|39.984222|116.319405| 0|492|39744.246|2008-10-23|05:53:11|
| 39.98421| 116.31939| 0|492|39744.246|2008-10-23|05:53:16|
+---------+----------+----+---+---------+----------+--------+
There are downsides to this method too, like if the date
was missing, then the whole row gets filtered out.
Upvotes: 1
Reputation: 25919
from pyspark.sql.types import *
from pyspark.sql import SparkSession
session = SparkSession.builder.appName('file reader').master('local[*]').getOrCreate()
schema = StructType([StructField("a", FloatType()),
StructField("b", FloatType()),
StructField("c", IntegerType()),
StructField("d", IntegerType()),
StructField("e", FloatType()),
StructField("f", StringType()),
StructField("g", StringType())])
df=session.read.option('mode','DROPMALFORMED').csv('test.plt',schema)
Upvotes: 4