Kevin Zheng
Kevin Zheng

Reputation: 97

Sparklyr fill NA/NULL in spark dataframe

I would like to assign NA/NULL in a spark data frame to its nearest neighbour. I came from R background so I uses sparklyr but can't figure out a way to do it.

Here is an example code:

 set.seed(1)    
 example <- data.frame (ID = 1:10, Cat = letters[1:5], 
                       Numb = sample(c(NA, NA, NA, NA, 1:10), 10))
     ID Cat Numb
  1   1   a   NA
  2   2   b    1
  3   3   c    3
  4   4   d    6
  5   5   e   NA
  6   6   a    5
  7   7   b    4
  8   8   c    9
  9   9   d   10
  10 10   e   NA 

So I would like to fill Numb column, ID 1 NA to ID2 Numb 1, ID 5 to either id 4 and 6 (6 or 5), and ID 10 to ID 9 value (10). It can be done in R easily. Is there anyway to do it in Spark through Sparklyr?

Here is my R solution:

example$Numb1 <- example$Numb[c(1,1:(nrow(example)-1))]
example$Numb2 <- example$Numb[c(2:(nrow(example)), nrow(example))]
example$Merge <- ifelse(is.na(example$Numb), ifelse(is.na(example$Numb1), 
example$Numb2, example$Numb1), example$Numb)

    ID Cat Numb Numb1 Numb2 Merge
1   1   a   NA    NA     1     1
2   2   b    1    NA     3     1
3   3   c    3     1     6     3
4   4   d    6     3    NA     6
5   5   e   NA     6     5     6
6   6   a    5    NA     4     5
7   7   b    4     5     9     4
8   8   c    9     4    10     9
9   9   d   10     9    NA    10
10 10   e   NA    10    NA    10

Of course, things can get more complicated if I have multiple NA values in consecutive rows. Maybe another suggestion can be advised.

But for sparklyr, I have no clue what I can do.

Upvotes: 2

Views: 2377

Answers (1)

Jaime Caffarel
Jaime Caffarel

Reputation: 2469

Here is a partially working solution with a SQL query and a the mutate function from the dplyr package. It does not address the situation of multiple NA values in consecutive rows, since it's a translation of your base R solution, but it might be useful for other (more complete) approaches.

I've used the Lag and Lead functions of HiveQL in order to perform the "shifting" down and up of your column. It involves the creation of a new auxiliary Spark table (example2), which holds the "Numb1" and "Numb2" columns. Then, once the auxiliary table has been created, you can create the "Merged" column with mutate

library(DBI)
library(sparklyr)
library(dplyr)

set.seed(1)    
exampleDF <- data.frame (ID = 1:10, Cat = letters[1:5], 
                         Numb = sample(c(NA, NA, NA, NA, 1:10), 10))

# Connection to Spark and creation of the table to test.
sc <- spark_connect("local")
example <- copy_to(sc, exampleDF)  

# Create a Spark table with columns Numb1 and Numb2
DBI::dbSendQuery(sc, "CREATE TABLE example2 AS (SELECT ID, Cat, Numb, LAG(Numb, 1) over (PARTITION BY 1 ORDER BY ID) AS Numb1,
             LEAD(Numb, 1) over (PARTITION BY 1 ORDER BY ID) AS Numb2 FROM exampledf)")

# Load the auxiliary table as a Spark DataFrame
ex2 <- tbl(sc, "example2")

# Mutate in order to create the Merged column
res <- ex2 %>%
  mutate(Merged = ifelse(is.na(Numb), ifelse(is.na(Numb1), Numb2, Numb1), Numb))

res

# Source:   lazy query [?? x 6]
# Database: spark_connection
      id   cat  numb numb1 numb2 Merged
   <int> <chr> <int> <int> <int>  <int>
 1     1     a    NA    NA     1      1
 2     2     b     1    NA     3      1
 3     3     c     3     1     6      3
 4     4     d     6     3    NA      6
 5     5     e    NA     6     5      6
 6     6     a     5    NA     4      5
 7     7     b     4     5     9      4
 8     8     c     9     4    10      9
 9     9     d    10     9    NA     10
10    10     e    NA    10    NA     10

As a side note, you can also avoid the use of the mutate function (and all the ifelses) by means of the COALESCE function. I think this would be much more efficient.

DBI::dbGetQuery(sc, "SELECT ID, Cat, Numb, COALESCE(Numb, Numb1, Numb2) AS Merged FROM example2")
   ID Cat Numb Merged
1   1   a   NA      1
2   2   b    1      1
3   3   c    3      3
4   4   d    6      6
5   5   e   NA      6
6   6   a    5      5
7   7   b    4      4
8   8   c    9      9
9   9   d   10     10
10 10   e   NA     10

I hope this helps.

EDITED

If you want to avoid using SQL at all, you can do it also with dplyr functions:

example %>% arrange(ID) %>%
    mutate(Numb1 = lag(Numb, 1)) %>%
    mutate(Numb2 = lead(Numb, 1L)) %>%
    mutate(Merged = ifelse(is.na(Numb), ifelse(is.na(Numb1), Numb2, Numb1), Numb))
# Source:     lazy query [?? x 6]
# Database:   spark_connection
# Ordered by: ID
      ID   Cat  Numb Numb1 Numb2 Merged
   <int> <chr> <int> <int> <int>  <int>
 1     1     a    NA    NA     1      1
 2     2     b     1    NA     3      1
 3     3     c     3     1     6      3
 4     4     d     6     3    NA      6
 5     5     e    NA     6     5      6
 6     6     a     5    NA     4      5
 7     7     b     4     5     9      4
 8     8     c     9     4    10      9
 9     9     d    10     9    NA     10
10    10     e    NA    10    NA     10
# ... with more rows

I had some trouble coding the two consecutive mutate functions (that's why I used a mixed SQL-dplyr approach in the first place). I ended up opening a issue on sparklyr.

Upvotes: 1

Related Questions