Nurzhan Nogerbek
Nurzhan Nogerbek

Reputation: 5236

How take data from several parquet files at once?

I need your help cause I am new in Spark Framework.

I have folder with a lot of parquet files. The name of these files has the same format: DD-MM-YYYY. For example: '01-10-2018', '02-10-2018', '03-10-2018', etc.

My application has two input parameters: dateFrom and dateTo.

When I try to use next code application hangs. It seems like application scan all files in folder.

val mf = spark.read.parquet("/PATH_TO_THE_FOLDER/*")
         .filter($"DATE".between(dateFrom + " 00:00:00", dateTo + " 23:59:59"))
mf.show()

I need to take data pool for period as fast as it possible.

I think it would be great to divide period into days and then read files separately, join them like that:

val mf1 = spark.read.parquet("/PATH_TO_THE_FOLDER/01-10-2018");
val mf2 = spark.read.parquet("/PATH_TO_THE_FOLDER/02-10-2018");

val final = mf1.union(mf2).distinct();

dateFrom and dateTo are dynamic, so I don't know how correctly organize code right now. Please help!


@y2k-shubham I tried to test next code, but it raise error:

import org.joda.time.{DateTime, Days}
import org.apache.spark.sql.{DataFrame, SparkSession}

val dateFrom = DateTime.parse("2018-10-01")
val dateTo = DateTime.parse("2018-10-05")

def getDaysInBetween(from: DateTime, to: DateTime): Int = Days.daysBetween(from, to).getDays

def getDatesInBetween(from: DateTime, to: DateTime): Seq[DateTime] = {
    val days = getDaysInBetween(from, to)
    (0 to days).map(day => from.plusDays(day).withTimeAtStartOfDay())
}

val datesInBetween: Seq[DateTime] = getDatesInBetween(dateFrom, dateTo)

val unionDf: DataFrame = datesInBetween.foldLeft(spark.emptyDataFrame) { (intermediateDf: DataFrame, date: DateTime) =>
    intermediateDf.union(spark.read.parquet("PATH" + date.toString("yyyy-MM-dd") + "/*.parquet"))
}
unionDf.show()

ERROR:

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 0 columns and the second table has 20 columns;

It seems like intermediateDf DateFrame at start is empty. How to fix the problem?

Upvotes: 1

Views: 1237

Answers (2)

y2k-shubham
y2k-shubham

Reputation: 11607

While I haven't tested this piece of code, it must work (probably slight modification?)

import org.joda.time.{DateTime, Days}
import org.apache.spark.sql.{DataFrame, SparkSession}

// return no of days between two dates
def getDaysInBetween(from: DateTime, to: DateTime): Int = Days.daysBetween(from, to).getDays

// return sequence of dates between two dates
def getDatesInBetween(from: DateTime, to: DateTime): Seq[DateTime] = {
  val days = getDaysInBetween(from, to)
  (0 to days).map(day => from.plusDays(day).withTimeAtStartOfDay())
}

// read parquet data of given date-range from given path
// (you might want to pass SparkSession in a different manner)
def readDataForDateRange(path: String, from: DateTime, to: DateTime)(implicit spark: SparkSession): DataFrame = {
  // get date-range sequence
  val datesInBetween: Seq[DateTime] = getDatesInBetween(from, to)

  // read data of from-date (needed because schema of all DataFrames should be same for union)
  val fromDateDf: DataFrame = spark.read.parquet(path + "/" + datesInBetween.head.toString("yyyy-MM-dd"))

  // read and union remaining dataframes (functionally)
  val unionDf: DataFrame = datesInBetween.tail.foldLeft(fromDateDf) { (intermediateDf: DataFrame, date: DateTime) =>
    intermediateDf.union(spark.read.parquet(path + "/" + date.toString("yyyy-MM-dd")))
  }

  // return union-df
  unionDf
}

Reference: How to calculate 'n' days interval date in functional style?

Upvotes: 1

Vladimir Matveev
Vladimir Matveev

Reputation: 127711

import java.time.LocalDate
import java.time.format.DateTimeFormatter

import org.apache.spark.sql.{DataFrame, SparkSession}

val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")

def dateRangeInclusive(start: String, end: String): Iterator[LocalDate] = {
  val startDate = LocalDate.parse(start, formatter)
  val endDate = LocalDate.parse(end, formatter)
  Iterator.iterate(startDate)(_.plusDays(1))
    .takeWhile(d => d.isBefore(endDate) || d.isEqual(endDate))
}

val spark = SparkSession.builder().getOrCreate()
val data: DataFrame = dateRangeInclusive("2018-10-01", "2018-10-05")
  .map(d => spark.read.parquet(s"/path/to/directory/${formatter.format(d)}"))
  .reduce(_ union _)

I also suggest using the native JSR 310 API (part of Java SE since Java 8) rather than joda-time, since it is more modern and does not require external dependencies. Note that first creating a sequence of paths and doing map+reduce is probably simpler for this use case than a more general foldLeft-based solution.

Additionally, you can use reduceOption, then you'll get an Option[DataFrame] if the input date range is empty. Also, if it is possible for some input directories/files to be missing, you'd want to do a check before invoking spark.read.parquet. If your data is on HDFS, you should probably use the Hadoop FS API:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

val spark = SparkSession.builder().getOrCreate()
val fs = FileSystem.get(new Configuration(spark.sparkContext.hadoopConfiguration))
val data: Option[DataFrame] = dateRangeInclusive("2018-10-01", "2018-10-05")
  .map(d => s"/path/to/directory/${formatter.format(d)}")
  .filter(p => fs.exists(new Path(p)))
  .map(spark.read.parquet(_))
  .reduceOption(_ union _)

Upvotes: 1

Related Questions