enneppi
enneppi

Reputation: 1039

R, dplyr and snow: how to parallelize functions which use dplyr

Let's suppose that I want to apply, in a parallel fashion, myfunction to each row of myDataFrame. Suppose that otherDataFrame is a dataframe with two columns: COLUNM1_odf and COLUMN2_odf used for some reasons in myfunction. So I would like to write a code using parApply like this:

clus <- makeCluster(4)
clusterExport(clus, list("myfunction","%>%"))

myfunction <- function(fst, snd) {
 #otherFunction and aGlobalDataFrame are defined in the global env
 otherFunction(aGlobalDataFrame)

 # some code to create otherDataFrame **INTERNALLY** to this function
 otherDataFrame %>% filter(COLUMN1_odf==fst & COLUMN2_odf==snd)
 return(otherDataFrame)
}
do.call(bind_rows,parApply(clus,myDataFrame,1,function(r) { myfunction(r[1],r[2]) }

The problem here is that R doesn't recognize COLUMN1_odf and COLUMN2_odf even if I insert them in clusterExport. How can I solve this problem? Is there a way to "export" all the object that snow needs in order to not enumerate each of them?

EDIT 1: I've added a comment (in the code above) in order to specify that the otherDataFrame is created interally to myfunction.

EDIT 2: I've added some pseudo-code in order to generalize myfunction: it now uses a global dataframe (aGlobalDataFrame and another function otherFunction)

Upvotes: 4

Views: 2301

Answers (2)

enneppi
enneppi

Reputation: 1039

Done some experiments, so I solved my problem (with the suggestion of Benjamin and considering the 'edit' that I've added to the question) with:

clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(magrittr)})
clusterExport(clus, "myfunction", "otherfunction", aGlobalDataFrame)

myfunction <- function(fst, snd) {
 #otherFunction and aGlobalDataFrame are defined in the global env
 otherFunction(aGlobalDataFrame)

 # some code to create otherDataFrame **INTERNALLY** to this function
 otherDataFrame %>% dplyr::filter(COLUMN1_odf==fst & COLUMN2_odf==snd)
 return(otherDataFrame)
}

do.call(bind_rows, parApply(clus, myDataFrame, 1, 
        {function(r) { myfunction(r[1], r[2]) } )

In this way I've registered aGlobalDataFrame, myfunction and otherfunction, in short all the function and the data used by the function used to parallelize the job (myfunction itself)

Upvotes: 5

Benjamin
Benjamin

Reputation: 17279

Now that I'm not looking at this on my phone, I can see a couple of issues.

First, you are not actually creating otherDataFrame in your function. You are trying to pipe an existing otherDataFrame into filter, and if otherDataFrame doesn't exist in the environment, the function will fail.

Second, unless you have already loaded the dplyr package into your cluster environments, you will be calling the wrong filter function.

Lastly, when you've called parApply, you haven't specified anywhere what fst and snd are supposed to be. Give the following a try:

clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(magrittr)})
clusterExport(clus, "myfunction")

myfunction <- function(otherDataFrame, fst, snd) {
 dplyr::filter(otherDataFrame, COLUMN1_odf==fst & COLUMN2_odf==snd)
}
do.call(bind_rows,parApply(clus,myDataFrame,1,function(r, fst, snd) { myfunction(r[fst],r[snd]), "[fst]", "[snd]") }

Upvotes: 1

Related Questions