Prometheus
Prometheus

Reputation: 693

Parallelize for loop in R

I am trying to learn how to use parallel processing in R. A snapshot of the data and the code is provided below.

Creating a rough dataset

library(truncnorm)
#Creating a mock dataframe
Market =c('City1','City2','City3','City4','City5','City2','City4','City1','City3','City5')
Car_type = c('A','A','A','A','A','B','B','B','B','B')
Variable1=c(.34,.19,.85,.27,.32,.43,.22,.56,.17,.11)
Car_purchased = c(1,0,0,1,0,1,0,0,1,1)
Market_data = data.frame(Market,Car_type,Variable1,Car_purchased)    
Market_data2=do.call("rbind", replicate(100, Market_data, simplify = FALSE)) 
#Create a bigger dataset
Market_data2$Final_value = 0 #create a column of for future calculation
empty_list = list()

Writing a function and running the function

Car_Value=function(data){
  market_list=unique(Market_data2$Market)
  for (m in market_list){
    market_subset = Market_data2[which(Market_data2$Market==m),]
    for (i in 1:nrow(market_subset)){
      if(market_subset[i,'Car_purchased']==1){
        market_subset[i,'Final_value'] = rtruncnorm(1,a=-10,b=0,mean=max(market_subset$Variable1),sd=1)
      } else{
        market_subset[i,'Final_value'] = rtruncnorm(1,a=-10,b=0,mean = market_subset[i,'Variable1'],sd=1)
      }
    }
    empty_list=rbind(empty_list,market_subset)
  }
  return(empty_list)
}

get_value = Car_Value(data=Market_data2)

In the above example, there are a total of 5 "Market" for cars and 2 "Car_type". Consumers may have bought the cars in either market. I have to calculate a value ("Final_value") from a given truncated normal distribution. This value only depends on the value of Variable1 of the given market. That is why I use the outer for loop. The means of the truncated normal distribution depends on the value of Variable1 (max(Variable1) in a market if the Car_purchased==1 or the given value if Car_purchased==0). This version of the code runs perfectly fine (although it is not optimized for speed).

Problem

Next what I would like to do is to use parallel processing for the outer for loop i.e. for the loop across the markets since the Final_value of a market depends only on the observations within the market.

Unfortunately, I only know how to implement parallel processing for each line of the dataset. For eg. my code (provided below) assigns the 1st line to the 1st core, 2nd line to the 2nd core and so on. This is inefficient and is taking a long time since each line has to create the subset and then find the max of the subset.

My inefficient version

library(parallel)
library(foreach)
library(doParallel)
library(iterators)
library(utils)
library(truncnorm)

cl=parallel::makeCluster(4,type="PSOCK") 
registerDoParallel(cl)
clusterEvalQ(cl, {library(truncnorm)})

Car_Value_Parallel <- function(market_data){
  output <- foreach(x = iter(market_data, by = "row"), .combine = rbind) %dopar% {
    market_subset = market_data[which(market_data$Market==x$Market),]
    if(x['Car_purchased']==1){
      x['Final_value'] = rtruncnorm(1,a=-10,b=0,mean=max(market_subset$Variable1),sd=1)
    } else{
      x['Final_value'] = rtruncnorm(1,a=-10,b=0,mean = x['Variable1'],sd=1)
    }
    return(x)
  }
  output
}

get_value_parallel = Car_Value_Parallel(market_data = Market_data2)
stopCluster(cl)

This is highly inefficient if I run it on a dataset of size > 100K (My actual dataset is about 1.2 million rows). However, I could not implement the parallelization at the market level where the parallel computation will be as follows: Run the computation for City1 in the 1st core, City2 in the 2nd core and so on. Can someone please help? Any help is appreciated. Thanks.

P.S. My apologies for the long question. I just wanted to show all versions of the code that I have used.

Upvotes: 0

Views: 235

Answers (1)

parkerchad81
parkerchad81

Reputation: 558

I see no reason to pursue parallel processing with your data set. Instead, look into packages like dplyr or data.table for a more efficient solution.

From my understanding of your problem, for each Market you want to apply rtruncnorm to create the variable Final_value where the mean argument of the rtruncnorm's function depends on the variable Car_purchased.

We can accomplish this without the need of a for loop, using dplyr.

library(truncnorm)
library(dplyr)

# Creating a mock dataframe
Market <- c("City1", "City2", "City3", "City4", "City5", "City2", "City4", "City1", "City3", "City5")
Variable1 <- c(.34, .19, .85, .27, .32, .43, .22, .56, .17, .11)
Car_purchased <- c(1, 0, 0, 1, 0, 1, 0, 0, 1, 1)
Market_data <- data.frame(Market, Car_type, Variable1, Car_purchased)
Market_data2 <- replicate(100, Market_data, simplify = FALSE) %>% bind_rows()
#Create a bigger dataset
Market_data2$Final_value = 0 #create a column of for future calculation
empty_list = list()

Car_Value2 <- function(data) {
  data %>%
    group_by(Market) %>%
    mutate(
      Final_value = if_else(
        Car_purchased == 1,
        rtruncnorm(1, a = -10, b = 0, mean = max(Variable1), sd = 1),
        rtruncnorm(1, a = -10, b = 0, mean = Variable1, sd = 1)
      )
    )
}


microbenchmark::microbenchmark(
  Car_Value(Market_data2),
  Car_Value2(Market_data2),
  times = 100
)
#> Unit: milliseconds
#>                      expr       min        lq      mean   median        uq
#>   Car_Value(Market_data2) 66.109304 68.043575 69.030763 68.56569 69.681255
#>  Car_Value2(Market_data2)  1.073318  1.101578  1.204737  1.17583  1.230687
#>        max neval cld
#>  89.497035   100   b
#>   3.465425   100  a


# Even bigger dataframe
Market_data3 <- replicate(120000, Market_data, simplify = FALSE) %>% bind_rows()


microbenchmark::microbenchmark(
  Car_Value2(data = Market_data3),
  times = 100 
)
#> Unit: milliseconds
#>                             expr      min       lq     mean   median
#>  Car_Value2(data = Market_data3) 338.4615 341.7134 375.8769 397.7133
#>        uq      max neval
#>  399.8733 412.5134   100

Created on 2019-03-10 by the reprex package (v0.2.1)

Upvotes: 3

Related Questions