Reputation: 546
I am running sparklyr
with R on a local instance with 8 cores and 64Gb RAM. My job is to left_join
a [50 000 000, 12]
dataframe with a [20 000 000, 3]
dataframe, which I run with Spark.
# Load packages
library(tidyverse)
library(sparklyr)
# Initialize configuration with defaults
config <- spark_config()
# Memory
# Set memory allocation for whole local Spark instance
# Sys.setenv("SPARK_MEM" = "50g")
# Set driver and executor memory allocations
# config$spark.driver.memory <- "8g"
# config$spark.driver.maxResultSize <- "8g"
# Connect to local cluster with custom configuration
sc <- spark_connect(master = "local", config = config, spark_home = spark_home_dir())
# Read df1 and df2
df1 <- spark_read_parquet(sc,
path = "/mnt/df1/",
memory = FALSE, overwrite = TRUE)
df2 <- spark_read_parquet(sc,
path = "/mnt/df2/",
memory = FALSE, overwrite = TRUE)
# Left join
df3 <- df1 %>%
dplyr::left_join(df2)
# Write or collect
sparklyr::spark_write_parquet(df3, path="/mnt/") # or
df3 <- df3 %>% collect()
No matter how I configure the Spark configuration file, the code fails with a java.lang.OutOfMemoryError: Java heap space
.
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 8.0 failed 1 times, most recent failure: Lost task 2.0 in stage 8.0 (TID 96, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
I have tried so far different combinations of
Sys.setenv("SPARK_MEM" = "50g")
config["sparklyr.shell.driver-memory"] <- "20G"
config["sparklyr.shell.num-executors"] <- 8
config$spark.driver.maxResultSize <- "8g"
config$spark.executor.memory <- "8g"
config$spark.memory.fraction <- 0.9
either in the R script or in the spark configuration file.
Similar questions have been asked 1 2 3 but none of these solved my problem.
Upvotes: 2
Views: 526
Reputation: 1254
You must specify a join key to left_join()
.
Otherwise you are trying to compute the cartesian product, which has size [1 000 000 000 000 000, 15] and surely overflows memory.
Also: avoid calling collect()
on big datasets, since this will move all data back to the driver and has high chances of OOM on the driver.
Upvotes: 1