Reputation: 3805
I have an input csv file with 4500 rows. Each row has a unique ID and for each row, I have to read some data, do some calculation, and write the output in a csv file so that I have 4500 csv files written in my output directory. An individual output csv file contains a single row of data with 8 columns
Since I have to perform the same calculation on each row of my input csv, I thought I can parallelise this task using foreach
. Following is the overall structure of the logic
library(doSNOW)
library(foreach)
library(data.table)
input_csv <- fread('inputFile.csv'))
# to track the progres of the loop
iterations <- nrow(input_csv)
pb <- txtProgressBar(max = iterations, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress = progress)
myClusters <- makeCluster(6)
registerDoSNOW(myClusters)
results <-
foreach(i = 1:nrow(input_csv),
.packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
.errorhandling = 'remove',
.options.snow = opts) %dopar%
{
rowRef <- input_csv[i, ]
# read data for the unique location in `rowRef`
weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations
# save the results as csv
fwrite(temp_result, file.path(paste0('output_iter_',i,'.csv')))
return(temp_result)
}
The above code works fine but always get stuck/inactive/does not do anything after finishing 25% or 30% of the rows in input_csv
. I keep looking at my output directory that after N% of iterations, no file is being written. I suspect if the foreach loop goes into some sleep mode? What I find more confounding is that if I kill the job, re-run the above code, it does say 16% or 30% and then goes inactive again i.e. with each fresh run, it "sleeps" at different progress level.
I can't figure out how to give a minimal reproducible example in this case but thought if anyone knows of any checklist I should go through or potential issues that is causing this would be really helpful. Thanks
EDIT I am still struggling with this issue. If there is any more information I can provide, please let me know.
EDIT2
My original inputFile
contains 213164 rows. So I split my the big file
into 46 smaller files so that each file has 4634 rows
library(foreach)
library(data.table)
library(doParallel)
myLs <- split(mydat, (as.numeric(rownames(mydat))-1) %/% 46))
Then I did this:
for(pr in 1:46){
input_csv <- myLs[[pr]]
myClusters <- parallel::makeCluster(6)
doParallel::registerDoParallel(myClusters)
results <-
foreach(i = 1:nrow(input_csv),
.packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
.errorhandling = 'remove',
.verbose = TRUE) %dopar%
{
rowRef <- input_csv[i, ]
# read data for the unique location in `rowRef`
weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations
# save the results as csv
fwrite(temp_result, file.path(paste0('output_iter_',i,'_',pr,'.csv')))
gc()
}
parallel::stopCluster(myClusters)
gc()
}
This too works till say pr = 7 or pr = 8 iteration and then does not proceed and also does not generate any error message. I am so confused.
EDIT this is what my CPU usage looks like. I only used 4 cores to generate this image. Will anyone be able to explain if there's anything in this image that might address my question.
Upvotes: 5
Views: 1532
Reputation: 290
You need to take your focus away from each file loop as that is not the issue. Te issue is with processing of content within a file. The issue is that when you are trying to create a file per row you are not committing the write after each row and therefore the whole process for one file and row by row gets stacked up in memory. You need to flush the memory as you write the file and close the connection.
Try to use apply as per below example if possible
For each row in an R dataframe
Try to close the connection to the file as it is written Reference below:
https://stat.ethz.ch/R-manual/R-devel/library/base/html/connections.html
Upvotes: 1
Reputation: 8572
From your code it is not entirely possible to see why it should stall. Maybe some parts of your foreach
loop is not thread safe (data.table
uses multible threads for subsetting for example)?
As it stands there's very little to change to help out, and @Waldi's answer is likely good at diagnosing the actual problem. The only thing that seems obvious to change here, is to avoid iterating over single rows of your data.frame
by utilizing the under-the-hood functionality of foreach
.
The way foreach
performs parallel programming is by creating an iterator over the object. For parallel programming there will be some overhead between each iteration, as the thread/core will need to request new information. As such it is beneficial to minimize this overhead time, by minimizing the number of iterations. We can do this by splitting our dataset up into chunks or manually creating an iterator through the iterators
package.
I don't have access to your data, so below is a reproducible example using the mtcars
dataset. I've split it into a setup and foreach block for easier readability. Note that files
in my example is a simple vector, so requires some minimal alteration for the actual code shown in the question as files
within the foreach
loop now becomes a data.frame
rather than a vector.
library(iterators)
library(foreach)
library(data.table)
library(arrow)
library(doParallel)
# Set up reproducible example:
data(mtcars)
files <- replicate(100, tempfile())
lapply(files, function(x)write_parquet(mtcars, x))
# Split the files into chunks for the iterator
nc <- parallel::detectCores()
sfiles <- split(files, seq_len(length(files)) %% nc + 1)
# Set up backend
th <- parallel::makeCluster(nc)
registerDoParallel(th)
foreach(files = sfiles, #Note the iterator will name each chunk 'files' within the loop.
.packages = c('data.table', 'arrow', 'dplyr'),
.combine = c, # Because I return the resulting file names
.multicombine = TRUE) %dopar% {
# Iterate over each chunk within foreach
# Reduces loop overhead
outF <- character(length(files))
for(i in seq_along(files)){
tib <- arrow::read_parquet(files[i])
# Do some stuff
tib <- tib %>% select(mpg, hp)
# Save output
outF[i] <- tempfile(fileext = '.csv')
fwrite(tib, outF[i])
}
# Return list of output files
return(outF)
}
Now I don't believe this will fix the issue, but it is something that can reduce your overhead slightly.
Upvotes: 4
Reputation: 41220
You could use the progressr package to follow-up memory usage interactively.
For example with furrr
package :
library(furrr)
library(pryr)
plan(multisession,workers=6)
library(progressr)
handlers("progress")
#input_csv <- fread('inputFile.csv'))
#filesID <- as.list(1:nrow(input_csv))
filesID <- as.list(1:12)
with_progress({
p <- progressor(along = filesID)
result <- future_map(filesID, function(fileID) {
#rowRef <- input_csv[fileID, ]
# read data for the unique location in `rowRef`
#weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations : simulate memory increase
temp_result <- rnorm(2e7)
# save the results as csv
#fwrite(temp_result, file.path(paste0('output_iter_',fileID,'.csv')))
Sys.sleep(2)
p(sprintf("memory used=%g", pryr::mem_used()))
return(object.size(temp_result))
},.options=future_options(packages=c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr")))
})
[====================================================>-------] 90% memory used=6.75075e+08
The same method applies to foreach.
Another suggestion is not to return the results to the main process as you already store them in a file. Instead of return(temp_result)
you could output a summary, for example object.size
knowing that the complete results can be found in the associated file.
Upvotes: 5