Reputation: 1087
I am trying to replicate the tidyr:complete
function in sparklyr. I have a dataframe with some missing values and I have to fill out those rows. In dplyr/tidyr I can do:
data <- tibble(
"id" = c(1,1,2,2),
"dates" = c("2020-01-01", "2020-01-03", "2020-01-01", "2020-01-03"),
"values" = c(3,4,7,8))
# A tibble: 4 x 3
id dates values
<dbl> <chr> <dbl>
1 1 2020-01-01 3
2 1 2020-01-03 4
3 2 2020-01-01 7
4 2 2020-01-03 8
data %>%
mutate(dates = as_date(dates)) %>%
group_by(id) %>%
complete(dates = seq.Date(min(dates), max(dates), by="day"))
# A tibble: 6 x 3
# Groups: id [2]
id dates values
<dbl> <date> <dbl>
1 1 2020-01-01 3
2 1 2020-01-02 NA
3 1 2020-01-03 4
4 2 2020-01-01 7
5 2 2020-01-02 NA
6 2 2020-01-03 8
However the complete
function does not exist in sparklyr
.
data_spark %>%
mutate(dates = as_date(dates)) %>%
group_by(id) %>%
complete(dates = seq.Date(min(dates), max(dates), by="day"))
Error in UseMethod("complete_") :
no applicable method for 'complete_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
Is there a way to set a UDF or to achieve a similar result?
Thank you
Upvotes: 4
Views: 991
Reputation: 9087
Here's a method that does all of the work in Spark.
library(sparklyr)
sc <- spark_connect(master = "local")
data <- tibble(
id = c(1, 1, 2, 2),
dates = c("2020-01-02", "2020-01-04", "2020-01-01", "2020-01-03"),
values = c(1, 2, 3, 4)
)
data_spark <- copy_to(sc, data)
We need to generate all combinations of dates
and id
. To do this, we need to know the total number of days and the first date.
days_info <-
data_spark %>%
summarise(
first_date = min(dates),
total_days = datediff(max(dates), min(dates))
) %>%
collect()
days_info
#> # A tibble: 1 x 2
#> first_date total_days
#> <chr> <int>
#> 1 2020-01-01 3
sdf_seq
can be used to generate a sequence in Spark. This can be used to get the combinations of dates
and id
.
dates_id_combinations <-
sdf_seq(
sc,
from = 0,
to = days_info$total_days,
repartition = 1
) %>%
transmute(
dates = date_add(local(days_info$first_date), id),
join_by = TRUE
) %>%
full_join(data_spark %>% distinct(id) %>% mutate(join_by = TRUE)) %>%
select(dates, id)
dates_id_combinations
#> # Source: spark<?> [?? x 2]
#> dates id
#> <date> <dbl>
#> 1 2020-01-01 1
#> 2 2020-01-01 2
#> 3 2020-01-02 1
#> 4 2020-01-02 2
#> 5 2020-01-03 1
#> 6 2020-01-03 2
#> 7 2020-01-04 1
#> 8 2020-01-04 2
full_join
the original data frame and the combination data frame. Then filter based on the min
/max
date for each group.
data_spark %>%
group_by(id) %>%
mutate(first_date = min(dates), last_date = max(dates)) %>%
full_join(dates_id_combinations) %>%
filter(dates >= min(first_date), dates <= max(last_date)) %>%
arrange(id, dates) %>%
select(id, dates)
#> # Source: spark<?> [?? x 2]
#> # Groups: id
#> # Ordered by: id, dates
#> id dates
#> <dbl> <chr>
#> 1 1 2020-01-02
#> 2 1 2020-01-03
#> 3 1 2020-01-04
#> 4 2 2020-01-01
#> 5 2 2020-01-02
#> 6 2 2020-01-03
Upvotes: 0
Reputation: 1441
Under the hood tidyr::complete
just performs a full join followed by optional NA fill. You can replicate its effects by using sdf_copy_to
to create a new sdf that is just a single column seq.Date
between your start and end date, and then perform a full_join
between that and your dataset.
Upvotes: 1