Reputation: 43
I would like to distribute jobs (with a for
loop) inside nodes of a cluster (several machines). I try to use the R package future
to do that. I didn't know if it's the best way to do that; I tried to use foreach
of the doParallel
package, but I didn't succeed. How can I figure out when the number of the loop iterations is greater than the number of cluster nodes?
library(doParallel);
library(doFuture);
#library(future);
registerDoFuture();
workers <- c(rep("129.20.25.61",1), rep("129.20.25.217",1));
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "", verbose = FALSE);
plan(cluster, workers = cl)
mu <- 1.0
sigma <- 2.0
for(i in 1:3){
res %<-%{ rnorm(i, mean = mu, sd = sigma)}
print(i);
}
Upvotes: 3
Views: 2416
Reputation: 6805
If you use the plain Future API, i.e. future()
+ value()
or %<-%
, there is no need to involve foreach, doFuture etc. Here is how to use the Future API by itself and what output you can expect:
(A) Setup the workers
library("future")
workers <- c("129.20.25.61", "129.20.25.217")
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "")
### starting worker pid=20026 on localhost:11900 at 11:47:28.334
### starting worker pid=12291 on localhost:11901 at 11:47:37.172
print(cl)
### socket cluster with 2 nodes on hosts '129.20.25.61', '129.20.25.217'
plan(cluster, workers = cl)
(B) Explicit Future API
Here we create a list of futures explicitly using future()
and retrieve for their values using values()
(basically equal to calling lapply(f, FUN = value)
).
mu <- 1.0
sigma <- 2.0
f <- list()
for (i in 1:3) {
f[[i]] <- future({ rnorm(i, mean = mu, sd = sigma) })
}
v <- values(f)
str(v)
### List of 3
### $ : num 3.25
### $ : num [1:2] 3.24 3.29
### $ : num [1:3] 1.251 2.299 0.923
(C) Implicit Future API
In this alternative, we create futures implicitly using the future assignment operator %<-%
(which internally will do future()
and then value()
when you try to access the future's value). Since %<-%
can only assign to environments (and not lists, data.frames, etc), we need to use a container that is an environment. Here I use the listenv class, which is an environment but allows you to index it as list.
library("listenv") ## listenv()
mu <- 1.0
sigma <- 2.0
v <- listenv()
for (i in 1:3) {
v[[i]] %<-% { rnorm(i, mean = mu, sd = sigma) }
}
v <- as.list(v)
str(v)
### List of 3
### $ : num 1.15
### $ : num [1:2] 2.2277 -0.0164
### $ : num [1:3] -2.09 3.34 -1.09
(D) Use future_lapply()
If you prefer an lapply()
-like approach, you could do:
v <- future_lapply(1:3, FUN = function(i) {
rnorm(i, mean = mu, sd = sigma)
})
str(v)
### List of 3
### $ : num 2.12
### $ : num [1:2] 2.56 -1.21
### $ : num [1:3] 2.89 -0.159 -0.983
(D) Using foreach()
If you'd like to use foreach()
, then you can do as follows. Note that it's best to always export global variables explicitly when using foreach()
per design of foreach - however, if you always use doFuture
it's actually not needed.
library("doFuture")
registerDoFuture()
workers <- c("129.20.25.61", "129.20.25.217")
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "")
plan(cluster, workers = cl)
v <- foreach(i = 1:3, .export = c("mu", "sigma")) %dopar% {
rnorm(i, mean = mu, sd = sigma)
}
str(v)
### List of 3
### $ : num 3.12
### $ : num [1:2] -0.0887 -2.8016
### $ : num [1:3] 2.15 3.5 -2.24
How can I figure out when the number of the loop iterations is taller than the number of cluster nodes?
I'm not sure what you're asking here. Are you concerned about having more futures running at one time that you have workers? If so, that is automatically taken care of. If all workers are occupied, then the creation of additional futures will block until one of the workers are available again.
Upvotes: 5