leslie roberson
leslie roberson

Reputation: 167

How to join, group and summarise large dataframes in R with multidplyr and parallel

This question is similar to other problems with very large data in R, but I can't find an example of how to merge/join and then perform calculations on two dfs (as opposed to reading in lots of dataframes and using mclapply to do the calculations). Here the problem is not loading the data (takes ~20 min but they do load), but rather the merging & summarising.

I've tried all data.table approachesI could find, different types of joins, and ff, and I still run into the problem of vecseq limits 2^31 rows. Now I'm trying to use multidplyr to do it in parallel, but can't figure out where the error is coming from.

Dataframes: species_data # df with ~ 65 million rows with cols <- c("id","species_id") lookup # df with ~ 17 million rows with cols <- c("id","cell_id","rgn_id") Not all ids in the lookup appear in the species_data

## make sample dataframes:

lookup <- data.frame(id = seq(2001,2500, by = 1), 
                     cell_id = seq(1,500, by = 1), 
                     rgn_id = seq(801,1300, by = 1))

library(stringi)

species_id <- sprintf("%s%s%s", stri_rand_strings(n = 1000, length = 3, pattern = "[A-Z]"),
                      pattern = "-",
                      stri_rand_strings(1000, length = 5, '[1-9]'))

id <- sprintf("%s%s%s", stri_rand_strings(n = 1000, length = 1, pattern = "[2]"),
                    stri_rand_strings(n = 1000, length = 1, pattern = "[0-4]"),
                    stri_rand_strings(n = 1000, length = 1, pattern = "[0-9]"))

species_data <- data.frame(species_id, id)

merge and join dfs with multidplyr

library(tidyverse)
install.packages("devtools")
library(devtools)
devtools::install_github("hadley/multidplyr") 
library(multidplyr)
library(parallel)

species_summary <- species_data %>%
  # partition the species data by species id
  partition(species_id, cluster = cluster) %>%
  left_join(species_data, lookup, by = "id") %>%
  dplyr::select(-id) %>%
  group_by(species_id) %>%
  ## total number of cells each species occurs in
  mutate(tot_count_cells = n_distinct(cell_id)) %>%
  ungroup() %>%
  dplyr::select(c(cell_id, species_id, rgn_id, tot_count_cells)) %>%
  group_by(rgn_id, species_id) %>% 
  ## number of cells each species occurs in each region
  summarise(count_cells_eez = n_distinct(cell_id)) %>% 
  collect() %>%
  as_tibble()
## Error in partition(., species_id, cluster = cluster) : unused argument (species_id)

## If I change to:
species_summary <- species_data %>%
  group_by(species_id) %>%
  partition(cluster = cluster) %>% ...
## get, "Error in worker_id(data, cluster) : object 'cluster' not found

This is my first attempt at parallel and big data and I'm struggling to diagnose the errors.

Thanks!

Upvotes: 2

Views: 1292

Answers (1)

agila
agila

Reputation: 3472

First I load dplyr and multidplyr

library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(multidplyr)
my_clusters <- new_cluster(3) # I have 4 cores

then I load the same data that you propose

library(stringi)
lookup <- tibble(
  id = as.character(seq(2001, 2500, by = 1)),
  cell_id = seq(1, 500, by = 1),
  rgn_id = sprintf("%s", stri_rand_strings(n = 500, length = 3, pattern = "[0-9]"))
)

species_id <- sprintf(
  "%s%s%s", 
  stri_rand_strings(n = 1000, length = 3, pattern = "[A-Z]"),
  pattern = "-",
  stri_rand_strings(n = 1000, length = 5, "[1-9]")
)
id <- sprintf(
  "%s%s%s", 
  stri_rand_strings(n = 1000, length = 1, pattern = "[2]"),
  stri_rand_strings(n = 1000, length = 1, pattern = "[0-4]"),
  stri_rand_strings(n = 1000, length = 1, pattern = "[0-9]")
)

species_data <- tibble(species_id, id)

Check the result

species_data
#> # A tibble: 1,000 x 2
#>    species_id id   
#>    <chr>      <chr>
#>  1 CUZ-98293  246  
#>  2 XDG-61673  234  
#>  3 WFZ-94338  230  
#>  4 UIH-97549  226  
#>  5 AGE-35257  229  
#>  6 BMD-75361  249  
#>  7 MJB-78799  226  
#>  8 STS-15141  225  
#>  9 RXD-18645  245  
#> 10 SKZ-58666  243  
#> # ... with 990 more rows
lookup
#> # A tibble: 500 x 3
#>    id    cell_id rgn_id
#>    <chr>   <dbl> <chr> 
#>  1 2001        1 649   
#>  2 2002        2 451   
#>  3 2003        3 532   
#>  4 2004        4 339   
#>  5 2005        5 062   
#>  6 2006        6 329   
#>  7 2007        7 953   
#>  8 2008        8 075   
#>  9 2009        9 008   
#> 10 2010       10 465   
#> # ... with 490 more rows

Now I can run the code using a multidplyr approach. I divide the dplyr code in two steps according to the two group_by(s)

first_step <- species_data %>% 
  left_join(lookup, by = "id") %>% 
  select(-id) %>% 
  group_by(species_id) %>% 
  partition(my_clusters) %>% 
  mutate(tot_count_cells = n_distinct(cell_id)) %>% 
  collect() %>% 
  ungroup()
first_step
#> # A tibble: 1,000 x 4
#>    species_id cell_id rgn_id tot_count_cells
#>    <chr>        <dbl> <chr>            <int>
#>  1 UIH-97549       NA <NA>                 1
#>  2 BMD-75361       NA <NA>                 1
#>  3 STS-15141       NA <NA>                 1
#>  4 RXD-18645       NA <NA>                 1
#>  5 HFI-78676       NA <NA>                 1
#>  6 KVP-45194       NA <NA>                 1
#>  7 SGW-29988       NA <NA>                 1
#>  8 WBI-79521       NA <NA>                 1
#>  9 MFY-86277       NA <NA>                 1
#> 10 BHO-37621       NA <NA>                 1
#> # ... with 990 more rows

and

second_step <- first_step %>% 
    group_by(rgn_id, species_id) %>% 
    partition(my_clusters) %>% 
    summarise(count_cells_eez = n_distinct(cell_id)) %>% 
    collect() %>% 
    ungroup()
second_step
#> # A tibble: 1,000 x 3
#>    rgn_id species_id count_cells_eez
#>    <chr>  <chr>                <int>
#>  1 <NA>   ABB-24645                1
#>  2 <NA>   ABY-98559                1
#>  3 <NA>   AEQ-42462                1
#>  4 <NA>   AFO-58569                1
#>  5 <NA>   AKQ-44439                1
#>  6 <NA>   AMF-23978                1
#>  7 <NA>   ANF-49159                1
#>  8 <NA>   APD-85367                1
#>  9 <NA>   AQH-64126                1
#> 10 <NA>   AST-77513                1
#> # ... with 990 more rows

Created on 2020-03-21 by the reprex package (v0.3.0)

Upvotes: 4

Related Questions