Ehs4n
Ehs4n

Reputation: 67

Pyspark read csv and combine date and time column and filter based on it

I have about 10,000 csv files containing 14 columns each. They contain data regarding a financial organization, the trade values, date and its time.

Some of the csv files are just headers and have no data in them. I managed to load all the csv files on my local hadoop file system. What I want to achieve is to filter the data to include records occurring only between 9am to 6pm.

How would I achieve this? Im so confused with the lambda and filter and all the stuff exists in the spark-python.

Could you show me how I can filter this and use the filtered data to do other analyses?

P.S, the winter time and summer time also needs to be considered which I was thinking I should have some functions to change the time to UTC format perhaps?

As my concern is about filtering data based on the Time column in my csv file, I have simplified the csvs. lets say:

CSV 1:(Filter.csv)

CSV 2:(NoFilter.csv)

and my code is:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

sqlc = SQLContext(sc)

ehsanLocationFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/Filter.csv'
ehsanLocationNonFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/NoFilter.csv'

df = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationNonFiltered)

dfFilter = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered)

data = df.rdd
dataFilter = dfFilter.rdd

data.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
dataFilter.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')

print data.count()
print dataFilter.count()

I am expecting to see the data.count returns 4 as all Times are fitting to the range and dataFilter.count returns 0 as there is no matching time.

Thanks!

Upvotes: 1

Views: 4078

Answers (2)

devesh
devesh

Reputation: 668

In your code you can use only 'csv' as the format

from pyspark import SparkContext, SparkConf
ehsanLocationFiltered = '/FileStore/tables/stackoverflow.csv'
df = sqlContext.read.format('csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered).rdd
result=data.map(lambda row: row.Time > '07:00' and row.Time < '17:00')
result.count()

Upvotes: 1

Ehs4n
Ehs4n

Reputation: 67

Ok I found out what is the problem with my code! I should have used:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

sqlc = SQLContext(sc)

ehsanLocationFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/Filter.csv'
ehsanLocationNonFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/NoFilter.csv'

df = sqlContext.read.format('com.databricks.spark.csv')\
   .options(header='true', inferschema='true')\
   .load(ehsanLocationNonFiltered)

dfFilter = sqlContext.read.format('com.databricks.spark.csv')\
   .options(header='true', inferschema='true')\
   .load(ehsanLocationFiltered)

data = df.rdd
dataFilter = dfFilter.rdd

filteredResult = data.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
filteredResultExpected =dataFilter.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')

print filteredResult.count()
print filteredResultExpected.count()

filteredResultExpected = and filteredResult was missing!

Upvotes: 0

Related Questions