Randomize
Randomize

Reputation: 9103

Spark: grouping during loading

Usually I load csv files and then I run different kind of aggregations like for example "group by" with Spark. I was wondering if it is possible to start this sort of operations during the file loading (typically a few millions of rows) instead of sequentialize them and if it can be worthy (as time saving).

Example:

val csv = sc.textFile("file.csv")
val data = csv.map(line => line.split(",").map(elem => elem.trim))
val header = data.take(1)
val rows = data.filter(line => header(0) != "id")
val trows = rows.map(row => (row(0), row))
trows.groupBy(//row(0) etc.)

For my understanding of how Spark works, the groupBy (or aggregate) will be "postponed" to the loading in memory of the whole file csv. If this is correct, can the loading and the grouping run at the "same" time instead of sequencing the two steps?

Upvotes: 3

Views: 146

Answers (2)

Franzi
Franzi

Reputation: 1871

To start a group by on load operation You could proceed with 2 options:

  1. Write your own loader and make your own group by inside that + aggregationByKey. The cons of that is writting more code & more maintanance.
  2. Use Parquet format files as input + DataFrames, due it's columnar it will read only desired columns used in your groupBy. so it should be faster. - DataFrameReader

    df = spark.read.parquet('file_path')
    df = df.groupBy('column_a', 'column_b', '...').count()
    df.show()
    

Due Spark is Lazy it won't load your file until you call action methods like show/collect/write. So Spark will know which columns read and which ignore on the load process.

Upvotes: 1

zero323
zero323

Reputation: 330413

the groupBy (or aggregate) will be "postponed" to the loading in memory of the whole file csv.

It is not the case. At the local (single partition) level Spark operates on lazy sequences so operations belonging to a single task (this includes map side aggregation) can squashed together.

In other words when you have chain of methods operations are performed line-by-line not transformation-by-transformation. In other words the first line will be mapped, filtered, mapped once again and passed to aggregator before the next one is accessed.

Upvotes: 3

Related Questions