Reputation: 51
I have a function that I need to run on 2000 data frames. Each iteration is taking a very long time i.e almost 40 minutes and hence I'm using the 'foreach' package in R. I have generated the data in the following way:
library(foreach)
library(doParallel)
library(data.table)
library(matrixStats)
# DATA
datalist <- list()
for (i in 1:2000){
set.seed(i)
x1 <- rnorm(600,0.05,0.3)
x2 <- rnorm(600,-2,0.25)
data_2 <- data.frame(x1,x2)
lin_pred <- 1+(0.2 * data_2[1]) + (1.2*data_2[2])
prob <- 1/(1+exp(-lin_pred))
index <- rep(1:100, each = 6)
data <- data.frame(index,prob)
data_split <- split(data,f=data$index)
create_y <- function(p){
y <- c()
y_1 <- rbinom(1,1,p[1,2])
y <- append(y,y_1)
for(i in 2:6){
pr <- p[i,2] + 0.05*(y[i-1]-p[i-1,2])
u <- rbinom(1,1,pr)
y <- append(y,u)
}
return(y)
}
res <- lapply(data_split,create_y)
y <- data.frame(x=unlist(res))
final_data <- data.frame(index,y,x1,x2)
datalist[[i]] <- final_data
}
Now I have defined my objective function which will be optimized for each of the data set in the list defined above:
## Objective function:
dpd_tdependent <- function(x, fixed = c(rep(FALSE,5))){
params <- fixed
dpd <- function(p){
params[!fixed] <- p
alpha <- params[1]
beta_0 <- params[2]
beta_1 <- params[3]
beta_2 <- params[4]
rho <- params[5]
add_pi <- function(d){
k <- beta_0+(d[3]*beta_1)+(d[4]*beta_2)
k1 <- exp(k)/(1+exp(k))
p <- c()
p <- append(p,k1[1,1])
for(i in 2:6){
u <- k1[i,1] + rho*(d[i-1,2]-k1[i-1,1])
p <- append(p,u)
}
d <- cbind(d,p)
}
dat_split <- split(x , f = x$index)
result <- lapply(dat_split, add_pi)
result <- rbindlist(result)
result <- as.data.frame(result)
colnames(result) <- c('index','y','x1','x2','exp_prob')
result_split <- split(result, f = result$index)
## First expression
full_prob <- function(d){
k <- as.data.frame(expand.grid(c(0,1),c(0,1),c(0,1),c(0,1),c(0,1),c(0,1)))
k <- as.data.frame(t(k))
val <- c()
for(j in 1:ncol(k)){
d[2]<- k[j]
m <- d[1,5]^d[1,2] * ((1-d[1,5])^(1-d[1,2]))
for(i in 2:nrow(d)){
c <- 1+ (rho*((d[i,2]-d[i,5])*(d[i-1,2]-d[i-1,5]))/(sqrt(abs(d[i,5]*d[i-1,5]*(1-d[i,5])*(1-d[i-1,5])))))
m <- m* d[i,5]^d[i,2] * (1-d[i,5])^(1-d[i,2]) *c
}
val <- append(val,m)
}
val <- val^(1+alpha)
return(sum(val))
}
first_exp <- lapply(result_split,full_prob)
first_exp <- as.vector(unlist(first_exp))
## Second expression:
compute_prob <- function(d){
m <- d[1,5]^d[1,2] * ((1-d[1,5])^(1-d[1,2]))
for(i in 2:nrow(d)){
c <- 1+ (rho*((d[i,2]-d[i,5])*(d[i-1,2]-d[i-1,5]))/(sqrt(abs(d[i,5]*d[i-1,5]*(1-d[i,5])*(1-d[i-1,5])))))
m <- m* d[i,5]^d[i,2] * (1-d[i,5])^(1-d[i,2]) *c
}
return(m^alpha)
}
second_exp <- lapply(result_split,compute_prob)
second_exp <- as.vector(unlist(second_exp))
final_res <- first_exp - ((1+1/alpha)*(second_exp))
final_result <- sum(final_res)
}
}
Lastly, I'm using the foreach package to hasten the process:
cl = makeCluster(6)
registerDoParallel(cl)
mse <- matrix(,nrow=2000,ncol=5)
foreach(i = 1:2000, .packages = c('data.table','matrixStats'), .export = c('mse','datalist'))%dopar%{
beta <- rbind(1,0.2,1.2,0.05)
val <- dpd_tdependent(datalist[[i]], c(0.7,FALSE,FALSE,FALSE,FALSE))
b_s <- as.vector(optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho=0.001),val)$par)
conv <- optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho = 0.001),val)$convergence
k <- (b_s -beta)
k <- append(k,conv)
mse[i,] <- rbind(k)
if(colCounts(mse , value = 0, na.rm = TRUE)[5] == 500) break
}
stopCluster(cl)
Now, my problem is that the matrix mse is not being populated with the desired values after running the optim function inside the foreach package. I am aware that I can use the .combine feature inside the foreach function to return a matrix, but how will I assign a name to such a matrix? I mean if I cannot assign a name then, how will I check the last condition(inside if) and break?
I truly appreciate any help in this regard. Thank you.
Upvotes: 1
Views: 770
Reputation: 24722
I don't believe you should be trying to modify a global variable from within each worker. See my comment above and link. You shouldn't be checking within iteration process if 500 iterations have convergence=0, because that information is not available to each iteration. The below is one option to return what you want
cl = makeCluster(6)
registerDoParallel(cl)
mse = foreach(i = 1:2000, .packages = c('data.table','matrixStats')) %dopar%{
beta <- rbind(1,0.2,1.2,0.05)
val <- dpd_tdependent(datalist[[i]], c(0.7,FALSE,FALSE,FALSE,FALSE))
optim_sol <- optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho=0.001),val)
b_s <- optim_sol$par
conv <- optim_sol$convergence
c(b_s-beta,conv,i)
}
mse <- matrix(unlist(m),nrow=2000, byrow=T)
stopCluster(cl)
Upvotes: 1