Reputation: 101
I'm running the foreach package to try to parallelize my windows function (this was the only version of parallelizing I could follow easily). I basically need to call a function for g=1, then g=2, etc., and wanted to do this faster.
Thank you very much for your help!!
My code:
#agonize parallel
#main function
par_agonize <- function(datfile, num_groups, regen_pref_matrices = FALSE, graph_groups = num_groups) {
if (regen_pref_matrices) mm <- gen_pref_matrices(datfile)
out <- list()
tic.clearlog()
improve <- tibble(groups=numeric(), agony=numeric(), abs_dec=numeric(), percent_dec=numeric(), total_dec=numeric(), tot_per_dec=numeric())
foreach(g = 1:num_groups, .packages = loaded.package.names, .export = c(loaded.functions, loaded.objects), .verbose = TRUE) %dopar% { #key line where I use dopar/foreach
tic()
out[[g]] <- find_groups(mm, g) #this is the critical line, the improve and tic/toc log are just accessories
toc(log = TRUE, quiet = FALSE) #calculates time
log.lst <- tic.log(format = FALSE)
if (g == 1) { #this calculates summary statistics, not important
improve <- add_row(improve, groups = g, agony = out[[g]]$ag, abs_dec = 0, percent_dec = 0, total_dec = 0, tot_per_dec=0)
}
else {
improve <- add_row(improve, groups = g, agony = out[[g]]$ag, abs_dec = out[[g]]$ag - out[[g-1]]$ag,
percent_dec = (out[[g]]$ag - out[[g-1]]$ag)/(out[[g-1]]$ag), total_dec = out[[g]]$ag - out[[1]]$ag,
tot_per_dec = (out[[g]]$ag - out[[1]]$ag)/(out[[1]]$ag))
}
}
#just saves output to my list
out[["summary_stats"]] <- improve
out[["timings"]] <- tibble(num_groups = 1:g, run_time = unlist(lapply(log.lst, function(x) x$toc - x$tic))) %>%
add_row("num_groups" = "Total", "run_time" = sum(out[["timings"]]$run_time[1:g]))
out[["agony_graph"]] <- graph_agony(out, graph_groups)
social_rank <<- out
return(social_rank$agony_graph)
}
#test code
registerDoParallel(cores = detectCores() - 1)
loaded.package.names <- c(sessionInfo()$basePkgs, names(sessionInfo()$otherPkgs))
loaded.package.names #works
loaded.functions <- c("assign_groups", "find_agony", "find_groups", "generate_hierarchy", "gen_pref_matrices", "graph_agony", "init")
loaded.objects <- c("mm") #I can regenerate mm within my code... or use the mm that's already there, so I figured I would export it him
system.time(par_agonize("./data/hof17.csv", 2, regen=F)) #this is the MAIN line that runs my function
stopCluster(cl) #not clear if needed
My current error is:
automatically exporting the following variables from the local
environment:
improve, out
explicitly exporting variables(s): assign_groups, find_agony, find_groups,
generate_hierarchy, gen_pref_matrices, graph_agony, init, mm
numValues: 2, numResults: 0, stopped: TRUE
got results for task 1
numValues: 2, numResults: 1, stopped: TRUE
returning status FALSE
got results for task 2
accumulate got an error result
numValues: 2, numResults: 2, stopped: TRUE
calling combine function
evaluating call object to combine results:
fun(accum, result.1)
returning status TRUE
Show Traceback
Rerun with Debug
Error in { : task 2 failed - "replacement has length zero"
Upvotes: 2
Views: 815
Reputation: 101
I added the timing functions tic/toc to secondary functions, and took them out of foreach. I took Ralf's advice about just returning the primary output from foreach. Then, within the broader function foreach is nested in, I manipulated improve/summary stats & timings, then outputed all of them as a list.
Works great on Windows! Not sure if it's faster than sequential yet though.
par_agonize2 <- function(datfile, num_groups, regen_pref_matrices = FALSE, graph_groups = num_groups) {
if (regen_pref_matrices) mm <- gen_pref_matrices(datfile)
out <- foreach(g = 1:num_groups, .packages = loaded.package.names, .export = c(loaded.functions, loaded.objects), .verbose = TRUE) %dopar% {
x <- find_groups3(mm, g) #this is the critical line, the improve and tic/toc log are just accessories
}
improve <- tibble(groups=numeric(), agony=numeric(), abs_dec=numeric(), percent_dec=numeric(), total_dec=numeric(), tot_per_dec=numeric())
run_time <- vector()
for (g in 1:num_groups) {
if (g == 1) {improve <- add_row(improve, groups = 1, agony = out[[1]]$ag, abs_dec = 0, percent_dec = 0, total_dec = 0, tot_per_dec=0)}
else {improve <- add_row(improve, groups = g, agony = out[[g]]$ag, abs_dec = out[[g]]$ag - out[[g-1]]$ag,
percent_dec = (out[[g]]$ag - out[[g-1]]$ag)/(out[[g-1]]$ag), total_dec = out[[g]]$ag - out[[1]]$ag,
tot_per_dec = (out[[g]]$ag - out[[1]]$ag)/(out[[1]]$ag))
}
run_time[g] = unlist(out[[g]]$run_time)
}
timings <- tibble(num_groups = 1:g, run_time = unlist(run_time)) %>%
add_row("num_groups" = "Total", "run_time" = sum(unlist(run_time)))
list(groupings=out, summary_stats=improve, agony_graph=graph_agony(out, graph_groups), timings=timings)
}
Upvotes: 0
Reputation: 26843
Within the foreach
loop you are writing to objects defined outside of the loop: out
and improve
. While this is normal for for
loops, even the simplest example from the vignette uses a different syntax:
> x <- foreach(i=1:3) %do% sqrt(i)
> x
In effect one returns something from the loop body. All the results are then collected into a list. This is necessary since for parallel processing different R processes with different memory are used. Effectively foreach
is more like lapply
than for
.
As a first step I suggest you remove improve
, assign the result of foreach
to out
and return the result of your main function call. If that works, then one can find ways to include improve
. You could for example create use
list(out = out, improve = improve)
as last statement. This way each foreach
worker will return a list with both out
and improve
in it.
However, keep in mind that parallel processing is no silver bullet. You always introduce some communication overhead, which might negate any gains you get from the parallel execution. This is difficult to asses without reproducible code (and data).
Upvotes: 2