Reputation: 67
I have about 10,000 csv files containing 14 columns each. They contain data regarding a financial organization, the trade values, date and its time.
Some of the csv files are just headers and have no data in them. I managed to load all the csv files on my local hadoop file system. What I want to achieve is to filter the data to include records occurring only between 9am to 6pm.
How would I achieve this? Im so confused with the lambda and filter and all the stuff exists in the spark-python.
Could you show me how I can filter this and use the filtered data to do other analyses?
P.S, the winter time and summer time also needs to be considered which I was thinking I should have some functions to change the time to UTC format perhaps?
As my concern is about filtering data based on the Time column in my csv file, I have simplified the csvs. lets say:
CSV 1:(Filter.csv)
CSV 2:(NoFilter.csv)
and my code is:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)
ehsanLocationFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/Filter.csv'
ehsanLocationNonFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/NoFilter.csv'
df = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationNonFiltered)
dfFilter = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered)
data = df.rdd
dataFilter = dfFilter.rdd
data.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
dataFilter.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
print data.count()
print dataFilter.count()
I am expecting to see the data.count returns 4 as all Times are fitting to the range and dataFilter.count returns 0 as there is no matching time.
Thanks!
Upvotes: 1
Views: 4078
Reputation: 668
In your code you can use only 'csv' as the format
from pyspark import SparkContext, SparkConf
ehsanLocationFiltered = '/FileStore/tables/stackoverflow.csv'
df = sqlContext.read.format('csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered).rdd
result=data.map(lambda row: row.Time > '07:00' and row.Time < '17:00')
result.count()
Upvotes: 1
Reputation: 67
Ok I found out what is the problem with my code! I should have used:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)
ehsanLocationFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/Filter.csv'
ehsanLocationNonFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/NoFilter.csv'
df = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationNonFiltered)
dfFilter = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered)
data = df.rdd
dataFilter = dfFilter.rdd
filteredResult = data.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
filteredResultExpected =dataFilter.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
print filteredResult.count()
print filteredResultExpected.count()
filteredResultExpected = and filteredResult was missing!
Upvotes: 0