Reputation: 8044
Could anybody help me to somehow parallelize this code ? I am about to make some simulations but I STACKED... it takes too long - I even left my computer for 3 days and it did not finished calculating
sapply(1:1000, Take_expected_value, points =10^7, number_of_trajectories = 10^7)
Take_expected_value <- function(interval_end = 1, points = 100, number_of_trajectories = 1000){
return(
mean(
exp(
replicate(
n = number_of_trajectories,
expr = Max_from_Wiener_on_interval(interval_end, points)
)
)
)
) # This function just replicates max_from_... function, then put values
# to exp function, and calculates mean of all replications.
}
Max_from_Wiener_on_interval <- function(interval_end = 1, points = 100){
# time increment
Delta <- interval_end/points
# time moments
time <- seq( 0, interval_end, length = points + 1)
# Wiener process
W <- cumsum( sqrt(Delta) * rnorm( points + 1 ) )
# return max of "Wiener * sqrt(2) - time moment"
return(
max(sqrt(2) * W - time)
)
}
EDIT
After EDIT I am using this code, but it might be a problem of my weak machine(computer). Still it is very slow for me:
Take_expected_value <- function(interval_end = 1, points = 100, number_of_trajectories = 1000){
return(
mean(
exp(
replicate(
n = number_of_trajectories,
expr = Max_from_Wiener_on_interval(interval_end, points)
)
)
)
) # This function just replicates max_from_... function, then put values
# to exp function, and calculates mean of all replications.
}
# this function shall not be exported
Max_from_Wiener_on_interval <- function(interval_end = 1, points = 100){
# time increment
Delta <- interval_end/points
# time moments
time <- seq( 0, interval_end, length = points + 1)
# Wiener process
W <- cumsum( sqrt(Delta) * rnorm( points + 1 ) )
# return max of "Wiener * sqrt(2) - time moment"
return(
max(sqrt(2) * W - time)
)
}
install.packages("snowfall")
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS'))
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
system.time(
sfLapply(as.list(c(1:1000)),fun=Take_expected_value, points =10^6, number_of_trajectories = 10^6)
)
sfRemoveAll()
sfStop()
Upvotes: 0
Views: 155
Reputation: 2460
I tend to use snowfall, but there are many other ways to parallelize a function. Here is a generic script with a junk function meant to take a while to compute:
Iter_vals=as.list(c(1:16)) # the values to iterate the function with
fx_parallel_run=function(Iter_val, multiplier){ #junk function with 2 arguments
jnk=round(runif(1)*multiplier)
jnk1=runif(jnk)
for (i in 1:length(jnk1)){
jnk1[i]=(jnk1[i]*runif(1))+Iter_val[[1]]
}
return(jnk1)
}
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS')) #by default snowfall will use the total number of processors, so this is not necessary.
#However, if you are using the machine for other purposes,
#you can adapt this line to leave at least a core or two free
#so the computer is still functional for multi tasking.
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
results=sfLapply(Iter_vals,fun=fx_parallel_run, multiplier=800) #extra function arguments go after the first two sfLapply arguments
sfRemoveAll()
sfStop()
in your case, after specifying the functions, I would simply use:
require(snowfall)
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
results=sfLapply(as.list(c(1:1000)),fun=Take_expected_value, points =10^7, number_of_trajectories = 10^7)
sfRemoveAll()
sfStop()
This may need a little tweaking but I am not going to do all of the work for you.
Upvotes: 1