abhijeet
abhijeet

Reputation: 879

How to read multiple directories in s3 in spark Scala?

I have directories in s3 in following format,

 <base-directory>/users/users=20180303/hour=0/<parquet files>
 <base-directory>/users/users=20180303/hour=1/<parquet files>
 ....
 <base-directory>/users/users=20180302/hour=<0 to 23>/<parquet files>
 <base-directory>/users/users=20180301/hour=<0 to 23>/<parquet files>
 ....
 <base-directory>/users/users=20180228/hour=<0 to 23>/<parquet files>

Basically I have hourly subdirectories in daily directories.

Now I want to process parquet files the last 30 days.

I have tried following,

 val df = sqlContext.read.option("header", "true")
    .parquet(<base-directory> + File.separator + "users" + File.separator)
    .where(col("users").between(startDate, endDate))

where endDate and startDate are separated by 30 days and in yyyymmdd format.

Above solution is not giving correct subset of directories. What am I doing wrong ?

Upvotes: 0

Views: 2685

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

where function is used in filtering rows in dataframe. And you are using it for read parquet files from s3. So the whole concept is wrong.

Instead you can create an array of paths between startDate and endDate and pass it to sqlContext read api.

Programmatically speaking, you can do something like below (they are just pseudo code)

val listBuffer = new ListBuffer[String]
for(date <- startDate to endDate)
  listBuffer.append(<base-directory> + File.separator + "users" + File.separator+"users="+date)

val df = sqlContext.read.option("header", "true").parquet(listBuffer: _*)

Upvotes: 1

Related Questions