Nik-D
Nik-D

Reputation: 41

Parallel processing querying SQL using dplyr

I've got a very large dataset (~150gbs) stored in an sql database, and I need to query certain rows based on an identifier (id) from another dataframe (df). Using a standard approach as per below it takes about 4 minutes.

dbconn <- DBI::dbConnect(RSQLite::SQLite(), "My_database.db")
src_dbi(dbconn)
tbl <- tbl(dbconn, "My_table")

database_selection <- tbl %>%
    filter(id %in% local(df$id)) %>%    
    collect()

I'm trying to exploit my 48 core processor to do this faster, but I'm running into some issues, specifically the process doesn't complete even after about 30 minutes when I end up terminating it. I'm unclear whether this is because there are some overheads which I'm not understanding which make setting this up in parallel very costly, or there's just an issue with my code.

My approach is based on the solution to this: Parallel processing and querying SQL with dplyr or pool: MySQL server has gone away but using sockets & parLapply as I'm using a windows machine.

Specifically:

my_function <- function(ids) {
  # create a connection and close it on exit
  dbconn <- DBI::dbConnect(RSQLite::SQLite(), "My_database.db")
  on.exit(DBI::dbDisconnect(dbconn))
  
  # connect to table
  tbl <- tbl(dbconn, "My_table")
  
  # fllter rows
  my_selection <- tbl %>% 
    filter(id %in% local(ids)) %>%
    collect()  

  return(my_selection)
  
}

system.time({
  numCores <- detectCores() - 1
  cl <- makeCluster(numCores, type = "PSOCK")
  clusterEvalQ(cl,{
    library(RSQLite)
    library(dplyr)
    library(dbplyr)
  })
  database_selection<- parLapply(cl = cl, df$id, my_function)
  stopCluster(cl)
})


Any suggestions would be much appreciated.

Upvotes: 1

Views: 197

Answers (0)

Related Questions