stats_noob
stats_noob

Reputation: 5945

Parallelized version of code takes longer to run

I have the following code that is running fine:

# first code: works fine

# Step 1 : Create Data for Example:
library(dplyr)
library(ranger)

original_data = rbind( data_1 = data.frame( class = 1, height = rnorm(10000, 180,10), weight = rnorm(10000, 90,10), salary = rnorm(10000,50000,10000)),  data_2 = data.frame(class = 0, height = rnorm(100, 160,10), weight = rnorm(100, 100,10), salary = rnorm(100,40000,10000)) )

original_data$class = as.factor(original_data$class)
original_data$id = 1:nrow(original_data)

test_set=  rbind(original_data[ sample( which( original_data$class == "0" ) , replace = FALSE , 30 ) , ], original_data[ sample( which( original_data$class == "1" ) , replace = FALSE, 2000 ) , ])

train_set = anti_join(original_data, test_set)

The actual code starts here:

Step 2:

# Step 2: Create "Balanced" Random Subsets:

results <- list()
for (i in 1:100)
   
{
   iteration_i = i
   
    sample_i =  rbind(train_set[ sample( which( train_set$class == "0" ) , replace = TRUE , 50 ) , ], train_set[ sample( which( train_set$class == "1" ) , replace = TRUE, 60 ) , ])
   
    results_tmp = data.frame(iteration_i, sample_i)
    results_tmp$iteration_i = as.factor(results_tmp$iteration_i)
   results[[i]] <- results_tmp
   
}

results_df <- do.call(rbind.data.frame, results)

X<-split(results_df, results_df$iteration)

 invisible(lapply(seq_along(results),
       function(i,x) {assign(paste0("train_set_",i),x[[i]], envir=.GlobalEnv)},
       x=results))

Step 3:

# Step 3: Train Models on Each Subset:
wd = getwd()
results_1 <- list()

for (i in 1:100){
     
    model_i <- ranger(class ~  height + weight + salary, data = X[[i]], probability = TRUE)
    saveRDS(model_i, paste0("wd", paste("model_", i, ".RDS")))
    results_1[[i]] <- model_i   
}

Step 4:

# Step 4: Combine All Models and Use Combined Model to Make Predictions on the Test Set:
results_2 <- list()
for (i in 1:100){
predict_i <- data.frame(predict(results_1[[i]], data = test_set)$predictions)


predict_i$id = 1:nrow(predict_i)
 results_2[[i]] <- predict_i
   
}

final_predictions = aggregate(.~ id, do.call(rbind, results_2), mean)

I am planning on running this code on a dataset of about 200 million rows. I would like to speed this code up (Step 2, Step 3, Step 4) - I tried looking at different ways to do this, and came across "parallelization". Apparently, this can be done using libraries such as "future"/"foreach". Here was my attempt to parallelize the above code:

# second code: takes a long time to run 
library(doParallel)
library(foreach)

registerDoParallel(cores = detectCores())
foreach(i = 1:100, .packages = 'ranger') %dopar% {
        # Step 2: Create "Balanced" Random Subsets:

results <- list()
for (i in 1:100)
   
{
   iteration_i = i
   
    sample_i =  rbind(train_set[ sample( which( train_set$class == "0" ) , replace = TRUE , 50 ) , ], train_set[ sample( which( train_set$class == "1" ) , replace = TRUE, 60 ) , ])
   
    results_tmp = data.frame(iteration_i, sample_i)
    results_tmp$iteration_i = as.factor(results_tmp$iteration_i)
   results[[i]] <- results_tmp
   
}

results_df <- do.call(rbind.data.frame, results)

X<-split(results_df, results_df$iteration)

 invisible(lapply(seq_along(results),
       function(i,x) {assign(paste0("train_set_",i),x[[i]], envir=.GlobalEnv)},
       x=results))

# Step 3: Train Models on Each Subset:
wd = getwd()
results_1 <- list()

for (i in 1:100){
     
    model_i <- ranger(class ~  height + weight + salary, data = X[[i]], probability = TRUE)
    saveRDS(model_i, paste0("wd", paste("model_", i, ".RDS")))
    results_1[[i]] <- model_i   
}

# Step 4: Combine All Models and Use Combined Model to Make Predictions on the Test Set:
results_2 <- list()
for (i in 1:100){
predict_i <- data.frame(predict(results_1[[i]], data = test_set)$predictions)


predict_i$id = 1:nrow(predict_i)
 results_2[[i]] <- predict_i
   
}

final_predictions = aggregate(.~ id, do.call(rbind, results_2), mean)
    
}

stopImplicitCluster()

For some reason, it seems that contrary to what I would have thought - parallelization is making this code take a lot longer to run.

My Question: Does anyone know if there are any other ways to speed up this code? I have a feeling I have not correctly understood the concepts behind parallelization - can someone please show me how to do this?

Upvotes: 2

Views: 236

Answers (2)

Waldi
Waldi

Reputation: 41260

Parallel processing comes with the overhead of launching parallel tasks and putting together the results : it isn't always faster.

Before thinking about parallelizing, you could first identify the most time consuming parts of your code.

profvis package is a way of profiling code:

library(profvis)

profvis({
# Step 2: Create "Balanced" Random Subsets:

results <- list()
for (i in 1:100)
  
{
  iteration_i = i
  
  sample_i =  rbind(train_set[ sample( which( train_set$class == "0" ) , replace = TRUE , 50 ) , ], train_set[ sample( which( train_set$class == "1" ) , replace = TRUE, 60 ) , ])
  
  results_tmp = data.frame(iteration_i, sample_i)
  results_tmp$iteration_i = as.factor(results_tmp$iteration_i)
  results[[i]] <- results_tmp
  
}

results_df <- do.call(rbind.data.frame, results)

X<-split(results_df, results_df$iteration)

invisible(lapply(seq_along(results),
                 function(i,x) {assign(paste0("train_set_",i),x[[i]], envir=.GlobalEnv)},
                 x=results))

# Step 3: Train Models on Each Subset:
wd = getwd()
results_1 <- list()

for (i in 1:100){
  
  model_i <- ranger(class ~  height + weight + salary, data = X[[i]], probability = TRUE)
  saveRDS(model_i, paste0("wd", paste("model_", i, ".RDS")))
  results_1[[i]] <- model_i   
}

# Step 4: Combine All Models and Use Combined Model to Make Predictions on the Test Set:
results_2 <- list()
for (i in 1:100){
  predict_i <- data.frame(predict(results_1[[i]], data = test_set)$predictions)
  
  
  predict_i$id = 1:nrow(predict_i)
  results_2[[i]] <- predict_i
  
}

final_predictions = aggregate(.~ id, do.call(rbind, results_2), mean)
})

According to profvis, the most time consuming step is saveRDS: enter image description here

However, this only accounts for 1.3 seconds, whereas using system.time() instead of profvis shows that the code needs about 6 seconds to complete.

Reading profvis FAQ explains that :

Calls to external programs and libraries also may not show up in the profiling data. If you call functions from a package to fetch data from external sources, keep in mind that time spent in those functions may not show in the profiler.

Timing each step alone shows that step 4 takes around 3 seconds and isn't accounted for by profvis.

This leads to the function which is called there : predict.ranger

?ranger::predict.ranger shows that this function is multithreaded :

num.threads : Number of threads. Default is number of CPUs available.

Meaning that the CPU is already using all it's processors most of the time, so that extra parallel processing won't help much, or might even be slower!

This can be seen on the task manager (x-axis = time, y-axis = CPU use from 0 to 100%):

  • with for loop : enter image description here

  • with foreach loop : enter image description here

You'll find hereafter the parallelized (or not) code used to compare performance. I put all loops together in one single loop.

Note that under Windows, you should use makeCluster instead of registerDoParallel to setup number of cores used.

library(doParallel)
library(foreach)

cl <- makeCluster( detectCores()-1) 
registerDoParallel(cl)


# Step 2: Create "Balanced" Random Subsets:
  results <- list()
  results_1 <- list()
  results_2 <- list()
  wd = getwd()
  
# Measure performance  
  system.time({

  foreach (i = 1:1000,.packages='ranger') %dopar% # Parallel version
# for (i in 1:1000)                               # non parallel version
  {
    iteration_i = i
    
    sample_i =  rbind(train_set[ sample( which( train_set$class == "0" ) , replace = TRUE , 50 ) , ], train_set[ sample( which( train_set$class == "1" ) , replace = TRUE, 60 ) , ])
    
    results_tmp = data.frame(iteration_i, sample_i)
    results_tmp$iteration_i = as.factor(results_tmp$iteration_i)
    results[[i]] <- results_tmp
    
  
  # not necessary in loop
  # results_df <- do.call(rbind.data.frame, results)
  # X <- split(results_df, results_df$iteration)
  
  invisible(lapply(seq_along(results),
                  function(i,x) {assign(paste0("train_set_",i),x[[i]], envir=.GlobalEnv)},
                   x=results))
  
  # Step 3: Train Models on Each Subset:
 
    model_i <- ranger(class ~  height + weight + salary, data = results_tmp, probability = TRUE)
    saveRDS(model_i, paste0("wd", paste("model_", i, ".RDS")))
    results_1[[i]] <- model_i   
  
  
  # Step 4: Combine All Models and Use Combined Model to Make Predictions on the Test Set:
  
    predict_i <- data.frame(predict(model_i, data = test_set)$predictions)
    predict_i$id = 1:nrow(predict_i)
    results_2[[i]] <- predict_i
    list(i,model_i,predict_i)
  }
  })  
 
  final_predictions = aggregate(.~ id, do.call(rbind, results_2), mean)
  stopCluster(cl) 

Upvotes: 2

user12728748
user12728748

Reputation: 8516

A few notes:

  1. When running on 2E8 rows, you may want to make sure not to keep everything in memory and use fast operations. The data.table package may be useful here due to its performance and in-memory replacements. Maybe you do not need to export all training sets into the Global environment in step 2; I do not see where you use that, and it will take up a lot of memory (memory usage may become a primary concern here).
  2. Looking purely at performance, saving all the models as RDS objects is quite time consuming. Unless required later, skipping this step might speed things up quite a bit. If you have memory issues and need to spill to disk, you may consider saving the predicted output, perhaps with data.table::fwrite and subsequently read it in with data.table::fread.
  3. For some reason, despite the ranger and predict functions using multiple threads, running these steps in parallel may still give some speed improvements, depending on the way you can parallelize. In a linux environment, mclapply forks the process and does not copy data to all the nodes, so YMMV using other parallelization options. A few good suggestions for alternative ways to schedule in parallel are already in other comments/replies.
  4. Unless I overlooked it, it seems to me that you could sample your training set once and then split into multiple parallel sets, as I did not see where you would use multiple iterations that feed sequentially into each other.

Below is one example that probably could be optimized further, depending on the memory profile

library(data.table)
library(parallel)
ncores <- floor(detectCores()/2)-1 # set no. of cores for parallel processing
trs <- setDT(train_set, keep.rownames = TRUE) # turn into data.table 
n <- 1e2 # number of sampling iterations
# sample once, then label as iterations
results <- trs[c(sample(which(trs$class==0), 50*n, replace = TRUE), 
    sample(which(trs$class==1), 60*n, replace = TRUE))]
results[, iteration:=NA_character_]
results[class==0, iteration := as.character(cut(1:(50*n), n, labels = 1:n))]
results[class==1, iteration := as.character(cut(1:(60*n), n, labels = 1:n))]
results[, iteration := factor(iteration, order(unique(as.numeric(iteration))))]

# Step 3: Train Models on Each Subset:
calc_model <- \(x) ranger(class ~  height + weight + salary, data = x, probability = TRUE)
predict_model <- \(x) data.table(predict(calc_model(x), data = test_set)$predictions)[, id:=.I]
# save time and memory not saving model as RDS file; potentially, the list of models could 
# be saved in one write operation, which could also be faster

# Step 4: Combine All Models and Use Combined Model to Make Predictions on the Test Set:
# for some reason, despite predict using multiple threads, I still profit 
# from parallelization here; skipping generation of X to save memory
results_2 <- mclapply(results[, unique(iteration)], 
    \(x){predict_model(results[iteration == x])}, mc.cores=ncores)
final_predictions <- rbindlist(results_2)[, lapply(.SD, mean), .SDcols=c("0", "1"), by="id"]

Created on 2022-06-27 by the reprex package (v2.0.1)

Upvotes: 1

Related Questions