Reputation: 149
I have the following issue:
I do a sql query over a set of parquet files on HDFS and then I do a collect in order to get the result.
The problem is that when there are many rows I get an out of memory error.
This query requires shuffling so I can not do the query on each file.
One solution could be to iterate over the values of a column and save the result on disk:
df = sql('original query goes here')
// data = collect(df) <- out of memory
createOrReplaceTempView(df, 't')
for each c in cities
x = collect(sql("select * from t where city = c")
append x to file
As far as I know it will result in the program taking too much time because the query will be executed for each city.
What is the best way of doing this?
Upvotes: 1
Views: 5962
Reputation: 2812
Since your data is huge it is no longer possible to collect() anymore. So you can use a strategy to sample data and learn from the sampled data.
import numpy as np
arr = np.array(sdf.select("col_name").sample(False, 0.5, seed=42).collect())
Here you are sampling 50% of the data and just a single column.
Upvotes: 0
Reputation: 1244
As @cricket_007 said, I would not collect()
your data from Spark to append it to a file in R.
Additionally, it doesn't make sense to iterate over a list of SparkR::distinct()
cities and then select everything from those tables just to append them to some output dataset. The only time you would want to do that is if you are trying to do another operation within each group based upon some sort of conditional logic or apply an operation to each group using a function that is NOT available in SparkR.
I think you are trying to get a data frame (either Spark or R) with observations grouped in a way so that when you look at them, everything is pretty. To do that, add a GROUP BY city
clause to your first SQL query. From there, just write the data back out to HDFS or some other output directory. From what I understand about your question, maybe doing something like this will help:
sdf <- SparkR::sql('SELECT SOME GREAT QUERY FROM TABLE GROUP BY city')
SparkR::write.parquet(sdf, path="path/to/desired/output/location", mode="append")
This will give you all your data in one file, and it should be grouped by city
, which is what I think you are trying to get with your second query in your question.
You can confirm the output is what you want via:
newsdf<- SparkR::read.parquet(x="path/to/first/output/location/")
View(head(sdf, num=200))
Good luck, hopefully this helps.
Upvotes: 1
Reputation: 80
In the case if its running out of memory, which means that the output data is really very huge, so, you can write down the results into some file itself just like parquet file.
If you want to further perform some operation, on this collected data, you can read data from this file.
For large datasets we should not use collect()
, instead you may use take(100)
or take(some_integer)
in order to check that some values are correct.
Upvotes: 1