Reputation: 77
I use a code that interrupts the execution of a command for a given time in order to not reach the (crunchbase) API-Limit.
managed_call <- function(f, events = 44L, every = 60L) {
force(f)
minute_ <- rep(NA, events)
function(...) {
m_dif <- as.numeric(Sys.time() - minute_, units = "secs")
minute_[!is.na(m_dif) & m_dif > every] <<- NA
calls_remaining <- sum(is.na(minute_))
if (!calls_remaining) {
message("Close to API limit, pausing for ",
round(every - max(m_dif), 3), " seconds")
Sys.sleep(every - max(m_dif))
minute_[which.max(m_dif)] <- NA
minute_[Position(is.na, minute_)] <<- Sys.time()
f(...)
} else {
minute_[Position(is.na, minute_)] <<- Sys.time()
f(...)
}
}
}
When applying a normal apply or lapply command, this peace of code gives me the following warning:
Updated <- function(x){is.null(crunchbase_GET(x))}
> abc <- unlist(lapply(websites,Updated))
Close to API limit, pausing for 1.25 seconds
Close to API limit, pausing for 1.119 seconds
...
However, I tried another option with makeCluster and parSapply:
library("parallel")
abc<- logical(100)
Updated <- function(x){is.null(crunchbase_GET(x))}
cl <- makeCluster(detectCores(), type = "PSOCK")
clusterExport(cl, varlist = "websites")
clusterEvalQ(cl = cl, library(rcrunchbase))
abc <- parSapply(cl = cl, X = websites, FUN = Updated, USE.NAMES = FALSE)
The warning message does not appear now. Thus, I was wondering if the actual Sys.sleep() command is even executed, and, if not, if there is any possibility to make my code run with parSapply.
I am really sorry that I cannot give a good sample that can be reproduced for this specific case, as a user_key is required to use rCrunchbase and thus to retrieve information on the API Limit etc.
Upvotes: 0
Views: 509
Reputation: 161007
message
does not "escape" parSapply
, it is lost, same for cat
and warning
. The ability to pass basic information from a cl
child to the parent process is difficult.
An alternative (extension actually, since they rely on parallel
) is future
and future.apply
, as they do deal with console output.
cl <- parallel::makeCluster(3)
parallel::parLapply(cl, 1:3, function(i) { message("Hello: ", i+100); Sys.getpid(); })
# [[1]]
# [1] 22680
# [[2]]
# [1] 14504
# [[3]]
# [1] 27084
But future
:
library(future) # plan, cluster
library(future.apply) # future_lapply
# using the same 'cl'
plan(cluster, workers = cl)
future_lapply(1:3, function(i) { message("Hello: ", i+100); Sys.getpid(); })
# Hello: 101
# Hello: 102
# Hello: 103
# [[1]]
# [1] 22680
# [[2]]
# [1] 14504
# [[3]]
# [1] 27084
(Variations can demonstrate that cat
and warning
also escape the child processes.)
Upvotes: 1