Reputation: 167
This question is similar to other problems with very large data in R, but I can't find an example of how to merge/join and then perform calculations on two dfs (as opposed to reading in lots of dataframes and using mclapply to do the calculations). Here the problem is not loading the data (takes ~20 min but they do load), but rather the merging & summarising.
I've tried all data.table approachesI could find, different types of joins, and ff, and I still run into the problem of vecseq limits 2^31 rows. Now I'm trying to use multidplyr to do it in parallel, but can't figure out where the error is coming from.
Dataframes: species_data # df with ~ 65 million rows with cols <- c("id","species_id") lookup # df with ~ 17 million rows with cols <- c("id","cell_id","rgn_id") Not all ids in the lookup appear in the species_data
## make sample dataframes:
lookup <- data.frame(id = seq(2001,2500, by = 1),
cell_id = seq(1,500, by = 1),
rgn_id = seq(801,1300, by = 1))
library(stringi)
species_id <- sprintf("%s%s%s", stri_rand_strings(n = 1000, length = 3, pattern = "[A-Z]"),
pattern = "-",
stri_rand_strings(1000, length = 5, '[1-9]'))
id <- sprintf("%s%s%s", stri_rand_strings(n = 1000, length = 1, pattern = "[2]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-4]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-9]"))
species_data <- data.frame(species_id, id)
merge and join dfs with multidplyr
library(tidyverse)
install.packages("devtools")
library(devtools)
devtools::install_github("hadley/multidplyr")
library(multidplyr)
library(parallel)
species_summary <- species_data %>%
# partition the species data by species id
partition(species_id, cluster = cluster) %>%
left_join(species_data, lookup, by = "id") %>%
dplyr::select(-id) %>%
group_by(species_id) %>%
## total number of cells each species occurs in
mutate(tot_count_cells = n_distinct(cell_id)) %>%
ungroup() %>%
dplyr::select(c(cell_id, species_id, rgn_id, tot_count_cells)) %>%
group_by(rgn_id, species_id) %>%
## number of cells each species occurs in each region
summarise(count_cells_eez = n_distinct(cell_id)) %>%
collect() %>%
as_tibble()
## Error in partition(., species_id, cluster = cluster) : unused argument (species_id)
## If I change to:
species_summary <- species_data %>%
group_by(species_id) %>%
partition(cluster = cluster) %>% ...
## get, "Error in worker_id(data, cluster) : object 'cluster' not found
This is my first attempt at parallel and big data and I'm struggling to diagnose the errors.
Thanks!
Upvotes: 2
Views: 1292
Reputation: 3472
First I load dplyr and multidplyr
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(multidplyr)
my_clusters <- new_cluster(3) # I have 4 cores
then I load the same data that you propose
library(stringi)
lookup <- tibble(
id = as.character(seq(2001, 2500, by = 1)),
cell_id = seq(1, 500, by = 1),
rgn_id = sprintf("%s", stri_rand_strings(n = 500, length = 3, pattern = "[0-9]"))
)
species_id <- sprintf(
"%s%s%s",
stri_rand_strings(n = 1000, length = 3, pattern = "[A-Z]"),
pattern = "-",
stri_rand_strings(n = 1000, length = 5, "[1-9]")
)
id <- sprintf(
"%s%s%s",
stri_rand_strings(n = 1000, length = 1, pattern = "[2]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-4]"),
stri_rand_strings(n = 1000, length = 1, pattern = "[0-9]")
)
species_data <- tibble(species_id, id)
Check the result
species_data
#> # A tibble: 1,000 x 2
#> species_id id
#> <chr> <chr>
#> 1 CUZ-98293 246
#> 2 XDG-61673 234
#> 3 WFZ-94338 230
#> 4 UIH-97549 226
#> 5 AGE-35257 229
#> 6 BMD-75361 249
#> 7 MJB-78799 226
#> 8 STS-15141 225
#> 9 RXD-18645 245
#> 10 SKZ-58666 243
#> # ... with 990 more rows
lookup
#> # A tibble: 500 x 3
#> id cell_id rgn_id
#> <chr> <dbl> <chr>
#> 1 2001 1 649
#> 2 2002 2 451
#> 3 2003 3 532
#> 4 2004 4 339
#> 5 2005 5 062
#> 6 2006 6 329
#> 7 2007 7 953
#> 8 2008 8 075
#> 9 2009 9 008
#> 10 2010 10 465
#> # ... with 490 more rows
Now I can run the code using a multidplyr approach. I divide the dplyr code in two steps according to the two group_by(s)
first_step <- species_data %>%
left_join(lookup, by = "id") %>%
select(-id) %>%
group_by(species_id) %>%
partition(my_clusters) %>%
mutate(tot_count_cells = n_distinct(cell_id)) %>%
collect() %>%
ungroup()
first_step
#> # A tibble: 1,000 x 4
#> species_id cell_id rgn_id tot_count_cells
#> <chr> <dbl> <chr> <int>
#> 1 UIH-97549 NA <NA> 1
#> 2 BMD-75361 NA <NA> 1
#> 3 STS-15141 NA <NA> 1
#> 4 RXD-18645 NA <NA> 1
#> 5 HFI-78676 NA <NA> 1
#> 6 KVP-45194 NA <NA> 1
#> 7 SGW-29988 NA <NA> 1
#> 8 WBI-79521 NA <NA> 1
#> 9 MFY-86277 NA <NA> 1
#> 10 BHO-37621 NA <NA> 1
#> # ... with 990 more rows
and
second_step <- first_step %>%
group_by(rgn_id, species_id) %>%
partition(my_clusters) %>%
summarise(count_cells_eez = n_distinct(cell_id)) %>%
collect() %>%
ungroup()
second_step
#> # A tibble: 1,000 x 3
#> rgn_id species_id count_cells_eez
#> <chr> <chr> <int>
#> 1 <NA> ABB-24645 1
#> 2 <NA> ABY-98559 1
#> 3 <NA> AEQ-42462 1
#> 4 <NA> AFO-58569 1
#> 5 <NA> AKQ-44439 1
#> 6 <NA> AMF-23978 1
#> 7 <NA> ANF-49159 1
#> 8 <NA> APD-85367 1
#> 9 <NA> AQH-64126 1
#> 10 <NA> AST-77513 1
#> # ... with 990 more rows
Created on 2020-03-21 by the reprex package (v0.3.0)
Upvotes: 4