dimitriy
dimitriy

Reputation: 9460

Parallelizing a loop with updating during each iteration

I have some R code that puts together demographic data from the Census for all of states in the US into a list object. The block-level code can take a week to run as a sequential loop since there are ~11M blocks, so I am trying to parallelize the loop over states to make it faster. I have accomplished this goal with this:

states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
           "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
           "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
           "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
           "VT","VA","WA","WV","WI","WY","DC","PR")
library(future.apply)
plan(multiprocess)
ptm <- proc.time()
CensusObj_block_age_sex = list()

CensusObj_block_age_sex[states] <- future_lapply(states, function(s){
  county <- census_geo_api(key = "XXX", state = s, geo = "county", age = TRUE, sex = TRUE)
  tract  <- census_geo_api(key = "XXX", state = s, geo = "tract",  age = TRUE, sex = TRUE)
  block  <- census_geo_api(key = "XXX", state = s, geo = "block",  age = TRUE, sex = TRUE)
  censusObj[[s]] <- list(state = s, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
}
)

However, I need to make it more robust. Sometimes there are problem with the Census API, so I would like the CensusObj to be updated at each state iteration so that I don't loose my completed data if something wrong. That way I can restart the loop over the remaining state if something does goes wrong (like if I spell "WY" as "WU")

Would it be possible to accomplish this somehow? I am open to other methods of parallelization.


The code above runs, but it seems to run into memory issues:

Error: Failed to retrieve the value of MultisessionFuture (future_lapply-3) from cluster RichSOCKnode #3 (PID 80363 on localhost ‘localhost’). The reason reported was ‘vector memory exhausted (limit reached?)’. Post-mortem diagnostic: A process with this PID exists, which suggests that the localhost worker is still alive.

I have R_MAX_VSIZE = 8Gb in my .Renviron, but I am not sure how that would get divided between the 8 cores on my machine. This all suggests that I need to store the results of each iteration rather than try to keep it all in memory, and then append the objects together at the end.

Upvotes: 3

Views: 655

Answers (3)

starja
starja

Reputation: 10365

Here is a solution that uses doParallel (with the options for UNIX systems, but you can also use it on Windows, see here) and foreach that stores the results for every state separately and afterwards reads in the single files and combines them to a list.

library(doParallel)
library(foreach)

path_results <- "my_path"
ncpus = 8L
registerDoParallel(cores = ncpus)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
            "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
            "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
            "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
            "VT","VA","WA","WV","WI","WY","DC","PR")
results <- foreach(state = states) %dopar% {
                     county <- census_geo_api(key = "XXX", state = state, geo = "county", age = TRUE, sex = TRUE)
                     tract  <- census_geo_api(key = "XXX", state = state, geo = "tract",  age = TRUE, sex = TRUE)
                     block  <- census_geo_api(key = "XXX", state = state, geo = "block",  age = TRUE, sex = TRUE)
                     results <- list(state = state, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
                     
                     # store the results as rds
                     saveRDS(results,
                             file = paste0(path_results, "/", state, ".Rds"))
                     
                     # remove the results
                     rm(county)
                     rm(tract)
                     rm(block)
                     rm(results)
                     gc()
                     
                     # just return a string
                     paste0("done with ", state)
}

library(purrr)
# combine the results to a list
result_files <- list.files(path = path_results)
CensusObj_block_age_sex <- set_names(result_files, states) %>% 
  map(~ readRDS(file = paste0(path_results, "/", .x)))

Upvotes: 1

Akshit
Akshit

Reputation: 434

One possible solution that I have is to log the value of CensusObj to a text file i.e print the CensusObj in each iteration. The doSNOW package can be used for logging for example

library(doSNOW)
cl <- makeCluster(1, outfile="abc.out")
registerDoSNOW(cl)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
        "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
        "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
        "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
        "VT","VA","WA","WV","WI","WY","DC","PR")
foreach(i=1:length(states), .combine=rbind, .inorder = TRUE) %dopar% {
    county <- "A"
    tract  <- "B"
    block  <- "C"
    censusObj <- data.frame(state = states[i], age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
    # edit: print json objects to easily extract from the file
    cat(sprintf("%s\n",rjson::toJSON(censusObj)))
}
stopCluster(cl)

This would log the value of censusObj in abc.out and also logs the error if program crashes but you will get the latest value of censusObj logged in abc.out.

Here is the output of the last iteration from the log file:

Type: EXEC {"state":"PR","age":true,"sex":true,"block":"C","tract":"B","county":"A"} Type: DONE

Type:EXEC means that the iteration has started and Type:DONE means execution is completed. The result of cat will be present between these two statements of each iteration. Now, the value of CensusObj can be extracted from the log file as shown below:

Lines = readLines("abc.out")
results = list()
for(i in Lines){
    # skip processing logs created by doSNOW
    if(!startsWith(i, "starting") && !startsWith(i, "Type:")){
        results = rlist::list.append(results, jsonlite::fromJSON(i))      
    }
}

results will contain the elements all the values printed in abc.out.

> head(results, 1)
[[1]]
[[1]]$state
[1] "AL"

[[1]]$age
[1] TRUE

[[1]]$sex
[1] TRUE

[[1]]$block
[1] "C"

[[1]]$tract
[1] "B"

[[1]]$county
[1] "A"

It is not a very clean solution but works.

Upvotes: 0

Waldi
Waldi

Reputation: 41210

You could use a tryCatch inside future_lapply to try to relaunch the calculation in case of API error, for a maximum of maxtrials.
In the resulting list, you get for each calculation the number of trials and the final status, OK or Error:

    states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
                "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
                "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
                "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
                "VT","VA","WA","WV","WI","WY","DC","PR")
    library(future.apply)
    #> Le chargement a nécessité le package : future
    plan(multiprocess)
    ptm <- proc.time()

    maxtrials <- 3

    census_geo_api <-
      function(key = "XXX",
               state = s,
               geo = "county",
               age = TRUE,
               sex = TRUE) {
        paste(state,'-', geo)
      }


    CensusObj_block_age_sex <- future_lapply(states, function(s) {
      ntrials <- 1
      while (ntrials <= maxtrials) {
        hasError <- tryCatch({
          #simulate random error
          if (runif(1)>0.3) {error("API failed")}
          county <- census_geo_api(key = "XXX", state = s, geo = "county", age = TRUE, sex = TRUE)
          tract  <- census_geo_api(key = "XXX", state = s, geo = "tract",  age = TRUE, sex = TRUE)
          block  <- census_geo_api(key = "XXX", state = s, geo = "block",  age = TRUE, sex = TRUE)
        },
        error = function(e)
          e)

        if (inherits(hasError, "error")) {
          ntrials <- ntrials + 1
        } else { break}
          
      }
      
      if (ntrials > maxtrials) {
        res <- list(state = s, status = 'Error', ntrials = ntrials-1, age = NA, sex = NA, block = NA, tract = NA, county = NA)
      } else  {
        res <- list(state = s, status = 'OK'    , ntrials = ntrials, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
      }
      res
    }
    )

    CensusObj_block_age_sex[[1]]
    #> $state
    #> [1] "AL"
    #> 
    #> $status
    #> [1] "OK"
    #> 
    #> $ntrials
    #> [1] 3
    #> 
    #> $age
    #> [1] TRUE
    #> 
    #> $sex
    #> [1] TRUE
    #> 
    #> $block
    #> [1] "AL - block"
    #> 
    #> $tract
    #> [1] "AL - tract"
    #> 
    #> $county
    #> [1] "AL - county"

<sup>Created on 2020-08-19 by the [reprex package](https://reprex.tidyverse.org) (v0.3.0)</sup>

Upvotes: 1

Related Questions