Reputation: 130
I am using the function parSapply to run a simulation on the parallel environment. Here is my code:
runpar <- function(i) MonteCarloKfun(i=i)
# Detect number of cores available
ncores <- detectCores(logical=TRUE)
# Set up parallel environment
cl <- makeCluster(ncores, methods=FALSE)
# Export objects to parallel environment
clusterSetRNGStream(cl,1234567) # not necessary since we do not sample
clusterExport(cl, c("kfunctions","frq","dvec","case","control","polygon", "MonteCarloKfun", "khat",
"as.points", "secal"))
# For 1 parameter use parSapply
outpar <- parSapply(cl,i,runpar)
# close parallel environment
stopCluster(cl)
Does anyone know if there is a possibility to add a progress bar to the parSapply function. Ideally I would like something similar to pbapply of the pbapply library.
Upvotes: 3
Views: 3363
Reputation: 2927
Update - Feb 20, 2023
You can achieve this using the parabar
package. Disclaimer: I am the author of the package.
You can use the package in an interactive R
session as follows.
# Load the package.
library(parabar)
# Define a task to run in parallel.
task <- function(x) {
# Sleep a bit.
Sys.sleep(0.01)
# Return the result of a computation.
return(x + 1)
}
# Start a backend that supports progress tracking (i.e., `async`).
backend <- start_backend(cores = 4, cluster_type = "psock", backend_type = "async")
# Configure the bar if necessary, or change the bar engine.
configure_bar(
format = " > completed :current out of :total tasks [:percent] [:elapsed]"
)
# Run the task.
results <- par_sapply(backend, 1:1000, task)
# Update the progress bar options.
configure_bar(
format = "[:bar] :percent"
)
# Run the task again.
results <- par_sapply(backend, 1:1000, task)
# Stop the backend.
stop_backend(backend)
If you need more flexibility (e.g., when building an R
package). There is
also a lower-level API based on R6
classes.
# Create a specification object.
specification <- Specification$new()
# Set the number of cores.
specification$set_cores(cores = 4)
# Set the cluster type.
specification$set_type(type = "psock")
# Create a progress tracking context.
context <- ProgressDecorator$new()
# Get a backend that supports progress-tracking.
backend <- AsyncBackend$new()
# Register the backend with the context.
context$set_backend(backend)
# Start the backend.
context$start(specification)
# Get a modern bar instance.
bar <- ModernBar$new()
# Register the bar with the context.
context$set_bar(bar)
# Configure the bar.
context$configure_bar(
show_after = 0,
format = " > completed :current out of :total tasks [:percent] [:elapsed]"
)
# Run a task in parallel (i.e., approx. 3.125 seconds).
context$sapply(x = 1:1000, fun = task)
# Get the task output.
output <- backend$get_output()
# Close the backend.
context$stop()
Here is a proposed workflow, which perhaps fits Steve Weston's characterization
as a valiant effort. Yet, with some overhead, it accomplishes what I am
primarily interested in, i.e., (1) a cross-platform solution, (2) not tempering
with low-level parallel
implementation details, and (3) being parsimonious
concerning the dependencies used.
In a nutshell, the code below does the following:
prepare_file_for_logging
creates a temporary file (i.e.,
OS
-specific location) that will be used later on to report and track the
progress of the parallel task execution.par_sapply_with_progress
starts an R
session in the
background (i.e., without blocking the main session).
PSOCK
or FORK
) and runs the task in parallel via the function
parallel::parSapply
.track_progress
monitors the temporary
file and displays and updates a progress bar based on its contents.
The libraries used are parallel
and callr
, and some other functions from
base
and utils
. For the sake of clarity, the code below is explicitly
commented.
# Load libraries.
library(parallel)
library(callr)
# How many times to run?
runs <- 40
# Prepare file for logging the progress.
file_name <- prepare_file_for_logging()
# Run the task in parallel without blocking the main process.
process <- par_sapply_with_progress(
# Cluster specifications.
cores = 4,
type = "PSOCK",
# Where to write the progress.
file_name = file_name,
# Task specifications (i.e., just like in `parallel::parSapply`).
x = 1:runs,
fun = function(x, y) {
# Wait a little.
Sys.sleep(0.5)
# Do something useful.
return(x * y)
},
args = list(
y = 10
)
)
# Monitor the progress (i.e., blocking the main process until completion).
track_progress(
process = process,
iterations = runs,
file_name = file_name,
cleanup = TRUE
)
# Show the results.
print(process$get_result())
# |=====================================================================| 100%
# [1] 10 20 30 40 50 60 70 80 90 100 110 120 130 140 150 160 170 180
# [19] 190 200 210 220 230 240 250 260 270 280 290 300 310 320 330 340 350 360
# [37] 370 380 390 400
# Create and get temporary file name.
prepare_file_for_logging <- function(file_name) {
# If the file name is missing.
if(missing(file_name)) {
# Get a temporary file name (i.e., OS specific).
file_name <- tempfile()
}
# Create the actual file to avoid race conditions.
file_created <- file.create(file_name)
# Indicate if something went wrong creating the file.
stopifnot("Failed to create file." = file_created)
return(file_name)
}
# Run task in parallel and log the progress.
par_sapply_with_progress <- function(cores, type, file_name, x, fun, args) {
# Decorate the task function to enable progress tracking.
get_decorated_task <- function(task) {
# Evaluate symbol.
force(task)
# Create wrapper.
return(function(x, file_name, ...) {
# Update the progress on exit.
on.exit({
# Write processed element to file.
cat("\n", file = file_name, append = TRUE)
})
return(task(x, ...))
})
}
# Get the decorated task.
fun_decorated <- get_decorated_task(fun)
# Start a background process.
background_process <- callr::r_bg(function(cores, type, file_name, x, fun_decorated, args) {
# Make cluster.
cluster <- parallel::makeCluster(cores, type = type)
# Close the cluster on exit.
on.exit({
# Stop the cluster.
parallel::stopCluster(cluster)
})
# Output.
output <- do.call(parallel::parSapply, c(
list(cluster, x, fun_decorated, file_name), args
))
# Return the results to the background process.
return(output)
}, args = list(cores, type, file_name, x, fun_decorated, args))
# Return the background process `R6` object.
return(background_process)
}
# Track progress and keep the main process busy.
track_progress <- function(process, iterations, file_name, cleanup = TRUE) {
if (cleanup) {
on.exit({
# Remove the file (i.e., just in case).
unlink(file_name)
})
}
# Create a progress bar.
bar <- txtProgressBar(min = 0, max = iterations, initial = NA, style = 3)
# Record the number of processed iterations (i.e., runs).
n_tasks_processed <- 0
# While the process is alive.
while(n_tasks_processed < iterations) {
# Get the number of tasks processed.
n_tasks_processed <- length(scan(file_name, blank.lines.skip = FALSE, quiet = TRUE))
# If the process that started the workers is finished.
if(!process$is_alive()) {
# Indicate that all tasks have been processed.
n_tasks_processed <- iterations
}
# Update the progress bar.
setTxtProgressBar(bar, n_tasks_processed)
}
# Close the progress bar.
close(bar)
# Wait for the process to close.
process$wait()
}
Concerning logging and reading the progress from the temporary file, there are two things I can think of for reducing the overhead:
track_progress
continuously scans the temporary file and
updates the progress bar. However, this is likely not necessary. Perhaps a
better way is to set a timeout between subsequent file scans and progress bar
updates.Finally, I personally prefer opening a cluster once and reusing it across
different parts of my code. In this scenario, I would switch from callr::r_bg
(i.e., short-lived background R
process) to callr::r_session
(i.e.,
permanent R
session) for more control (i.e., also see this question).
I hope this helps others that have struggled with this issue as well.
Upvotes: 2
Reputation: 19677
The parSapply
function doesn't support a progress bar, and I don't think there is any really good way to implement one by adding extra code to the task function, although people have made valiant efforts to do that.
The doSNOW
package supports progress bars, so you could either use that directly or write a wrapper function that works like the parSapply
function. Here's one way to write such a wrapper function:
# This function is similar to "parSapply", but doesn't preschedule
# tasks and doesn't support "simplify" and "USE.NAMES" options
pbSapply <- function(cl, X, FUN, ...) {
registerDoSNOW(cl)
pb <- txtProgressBar(max=length(X))
on.exit(close(pb))
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress=progress)
foreach(i=X, .combine='c', .options.snow=opts) %dopar% {
FUN(i, ...)
}
}
You can easily modify this function to use either the tkProgressBar
or winProgressBar
function.
Here's an example use of pbSapply
:
library(doSNOW)
cl <- makeSOCKcluster(3)
x <- pbSapply(cl, 1:100, function(i, j) {Sys.sleep(1); i + j}, 100)
Note that this doesn't use prescheduling, so the performance won't be as good as parSapply
if you have small tasks.
Upvotes: 2