Reputation: 1215
I'm using pySpark to read and calculate statistics for a dataframe.
The dataframe looks like:
TRANSACTION_URL START_TIME END_TIME SIZE FLAG COL6 COL7 ...
www.google.com 20170113093210 20170113093210 150 1 ... ...
www.cnet.com 20170113114510 20170113093210 150 2 ... ...
I'm adding a new timePeriod
column to the dataframe, and after adding it, I would like to save the first 50K records with timePeriod
matching some pre-defined value.
My intention is to save those lines to CSV with the dataframe header.
I know this should be a combination of col
and write.csv
but I'm not sure how to properly use those for my intentions.
My current code is:
encodeUDF = udf(encode_time, StringType())
log_df = log_df.withColumn('timePeriod', encodeUDF(col('START_TIME')))
And after the column has been added i'm guessing I should use something like:
log_df.select(col('timePeriod') == 'Weekday').write.csv(....)
Can someone please help me fill the blanks here, to match my intentions?
Upvotes: 3
Views: 7431
Reputation: 35434
unix_timestamp
and date_format
are useful methods here as START_TIME
is not timestamp type.
dfWithDayNum = log_df.withColumn("timePeriod", date_format(
unix_timestamp(col("START_TIME"), "yyyyMMddHHmmss").cast(TimestampType), "u")
)
timePeriod
will have the day number of week (1 = Monday, ..., 7 = Sunday)
dfWithDayNum
.filter(col("timePeriod") < 6) //to filter weekday
.limit(50000) //X lines
.write.format("csv")
.option("header", "true")
.csv("location/to/save/df")
Upvotes: 1
Reputation: 1215
Solved using filter()
and limit()
methods in the following way:
new_log_df.filter(col('timePeriod') == '20161206, Morning').limit(50).write.\
format('csv').option("header", "true").save("..Path..")
Upvotes: 0