drumkey
drumkey

Reputation: 149

Spark: How collect large amount of data without out of memory

I have the following issue:

I do a sql query over a set of parquet files on HDFS and then I do a collect in order to get the result.

The problem is that when there are many rows I get an out of memory error.

This query requires shuffling so I can not do the query on each file.

One solution could be to iterate over the values of a column and save the result on disk:

df = sql('original query goes here')
// data = collect(df) <- out of memory
createOrReplaceTempView(df, 't')
for each c in cities
    x = collect(sql("select * from t where city = c")
    append x to file

As far as I know it will result in the program taking too much time because the query will be executed for each city.

What is the best way of doing this?

Upvotes: 1

Views: 5962

Answers (3)

s510
s510

Reputation: 2812

Since your data is huge it is no longer possible to collect() anymore. So you can use a strategy to sample data and learn from the sampled data.

import numpy as np
arr = np.array(sdf.select("col_name").sample(False, 0.5, seed=42).collect())

Here you are sampling 50% of the data and just a single column.

Upvotes: 0

nate
nate

Reputation: 1244

As @cricket_007 said, I would not collect() your data from Spark to append it to a file in R. Additionally, it doesn't make sense to iterate over a list of SparkR::distinct() cities and then select everything from those tables just to append them to some output dataset. The only time you would want to do that is if you are trying to do another operation within each group based upon some sort of conditional logic or apply an operation to each group using a function that is NOT available in SparkR.

I think you are trying to get a data frame (either Spark or R) with observations grouped in a way so that when you look at them, everything is pretty. To do that, add a GROUP BY city clause to your first SQL query. From there, just write the data back out to HDFS or some other output directory. From what I understand about your question, maybe doing something like this will help:

sdf <- SparkR::sql('SELECT SOME GREAT QUERY FROM TABLE GROUP BY city')

SparkR::write.parquet(sdf, path="path/to/desired/output/location", mode="append")

This will give you all your data in one file, and it should be grouped by city, which is what I think you are trying to get with your second query in your question.

You can confirm the output is what you want via:

newsdf<- SparkR::read.parquet(x="path/to/first/output/location/")
View(head(sdf, num=200))

Good luck, hopefully this helps.

Upvotes: 1

Aditya Chopra
Aditya Chopra

Reputation: 80

In the case if its running out of memory, which means that the output data is really very huge, so, you can write down the results into some file itself just like parquet file.

If you want to further perform some operation, on this collected data, you can read data from this file.

For large datasets we should not use collect(), instead you may use take(100) or take(some_integer) in order to check that some values are correct.

Upvotes: 1

Related Questions