Marco De Virgilis
Marco De Virgilis

Reputation: 1087

Complete dataframe in sparklyr

I am trying to replicate the tidyr:complete function in sparklyr. I have a dataframe with some missing values and I have to fill out those rows. In dplyr/tidyr I can do:

data <- tibble(
  "id" = c(1,1,2,2),
  "dates" = c("2020-01-01", "2020-01-03", "2020-01-01", "2020-01-03"),
  "values" = c(3,4,7,8))

# A tibble: 4 x 3
     id dates      values
  <dbl> <chr>       <dbl>
1     1 2020-01-01      3
2     1 2020-01-03      4
3     2 2020-01-01      7
4     2 2020-01-03      8

data %>% 
  mutate(dates = as_date(dates)) %>% 
  group_by(id) %>% 
  complete(dates = seq.Date(min(dates), max(dates), by="day"))

# A tibble: 6 x 3
# Groups:   id [2]
     id dates      values
  <dbl> <date>      <dbl>
1     1 2020-01-01      3
2     1 2020-01-02     NA
3     1 2020-01-03      4
4     2 2020-01-01      7
5     2 2020-01-02     NA
6     2 2020-01-03      8

However the complete function does not exist in sparklyr.

data_spark %>% 
  mutate(dates = as_date(dates)) %>% 
  group_by(id) %>% 
  complete(dates = seq.Date(min(dates), max(dates), by="day"))

Error in UseMethod("complete_") : 
no applicable method for 'complete_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"

Is there a way to set a UDF or to achieve a similar result?

Thank you

Upvotes: 4

Views: 991

Answers (2)

Paul
Paul

Reputation: 9087

Here's a method that does all of the work in Spark.

library(sparklyr)

sc <- spark_connect(master = "local")

data <- tibble(
  id = c(1, 1, 2, 2),
  dates = c("2020-01-02", "2020-01-04", "2020-01-01", "2020-01-03"),
  values = c(1, 2, 3, 4)
)

data_spark <- copy_to(sc, data)

We need to generate all combinations of dates and id. To do this, we need to know the total number of days and the first date.

days_info <-
  data_spark %>%
  summarise(
    first_date = min(dates),
    total_days = datediff(max(dates), min(dates))
  ) %>%
  collect()
days_info
#> # A tibble: 1 x 2
#>   first_date total_days
#>   <chr>           <int>
#> 1 2020-01-01          3

sdf_seq can be used to generate a sequence in Spark. This can be used to get the combinations of dates and id.

dates_id_combinations <- 
  sdf_seq(
    sc,
    from = 0,
    to = days_info$total_days,
    repartition = 1
  ) %>%
  transmute(
    dates = date_add(local(days_info$first_date), id),
    join_by = TRUE
  ) %>%
  full_join(data_spark %>% distinct(id) %>% mutate(join_by = TRUE)) %>%
  select(dates, id)
dates_id_combinations
#> # Source: spark<?> [?? x 2]
#>   dates         id
#>   <date>     <dbl>
#> 1 2020-01-01     1
#> 2 2020-01-01     2
#> 3 2020-01-02     1
#> 4 2020-01-02     2
#> 5 2020-01-03     1
#> 6 2020-01-03     2
#> 7 2020-01-04     1
#> 8 2020-01-04     2

full_join the original data frame and the combination data frame. Then filter based on the min/max date for each group.

data_spark %>%
  group_by(id) %>%
  mutate(first_date = min(dates), last_date = max(dates)) %>%
  full_join(dates_id_combinations) %>%
  filter(dates >= min(first_date), dates <= max(last_date)) %>%
  arrange(id, dates) %>%
  select(id, dates)
#> # Source:     spark<?> [?? x 2]
#> # Groups:     id
#> # Ordered by: id, dates
#>      id dates     
#>   <dbl> <chr>     
#> 1     1 2020-01-02
#> 2     1 2020-01-03
#> 3     1 2020-01-04
#> 4     2 2020-01-01
#> 5     2 2020-01-02
#> 6     2 2020-01-03

Upvotes: 0

bcarlsen
bcarlsen

Reputation: 1441

Under the hood tidyr::complete just performs a full join followed by optional NA fill. You can replicate its effects by using sdf_copy_to to create a new sdf that is just a single column seq.Date between your start and end date, and then perform a full_join between that and your dataset.

Upvotes: 1

Related Questions