Akthem
Akthem

Reputation: 53

Using parSapply that needs to write to a data frame in parallel

I have a custom function that goes through a time series data frame and returns a 30 minutes sliding window from the original time series. From this 30 minutes the function writes to another data frame the starting and finishing timestamps together with the minimum and maximum of this sliding window.

After that sapply statement is used to make this function recursive across the whole data range.

The sapply is too slow but works. I want to be able to parallelize the sapply, but when I do so code returns errors. I attribute this to the requirement of writing the final result of the function to the same data frame in parallel.

result_df_2 <- data.frame(Start.time=as.POSIXct(character()), finish.time = as.POSIXct(character()), max.value = double(), min.value = double(), stringsAsFactors = FALSE)

sliding_window <- function(sequence, time_row, Query, window_width) {
    sliding_window_1 <- Query[time_row <= (time_row[sequence] + window_width * 60 + 1 * 60) & 
                              time_row > time_row[sequence], ]
    if (nrow(sliding_window_1) >= 1) {
        temp.df <- data.frame(Start.time = sliding_window_1$TIME[1],
                              finish.time = sliding_window_1$TIME[nrow(sliding_window_1)],
                              max.value = max(sliding_window_1$C19.X.AAA.01, na.rm = T),
                              min.value = min(sliding_window_1$C19.X.AAA.01, na.rm = T))
        result_df_2[nrow(result_df_2)+1,] <<- temp.df[1,]
    }
}

sapply(1:(nrow(WBP) - 30), FUN = sliding_window, Query = WBP, time_row = WBP$TIME, window_width = 30)

An answer in this question Parallel while loop in R that mentioned such a scenario is possible to be parallelized. I need your help to know how.

Below is the output of dput(WBP[1:10,])

structure(list(TIME = structure(c(1484589600, 1484589660, 1484589720, 
                              1484589780, 1484589840, 1484589900, 1484589960, 1484590020, 1484590080, 
                              1484590140), class = c("POSIXct", "POSIXt"), tzone = ""), C19.X = c(216.193, 
                                                                                                  220.204, 218.845, 218.676, 219.194, 219.976, 219.894, 219.168, 
                                                                                                  216.713, 216.551), C19.N = c(214.201, 216.985, 218.15, 217.3, 
                                                                                                                               218.11, 218.194, 218.332, 216.679, 215.343, 215.403), C19.X.AA.01 = c(216.193, 
                                                                                                                                                                                                     220.204, NA, NA, NA, NA, NA, NA, NA, 216.551), C19.X.AAA.01 = c(216.193, 
                                                                                                                                                                                                                                                                     220.204, 219.747375, 219.29075, 218.834125, 218.3775, 217.920875, 
                                                                                                                                                                                                                                                                     217.46425, 217.007625, 216.551)), .Names = c("TIME", "C19.X", 
                                                                                                                                                                                                                                                                                                                  "C19.N", "C19.X.AA.01", "C19.X.AAA.01"), row.names = c(NA, 10L
                                                                                                                                                                                                                                                                                                                  ), class = c("data.table", "data.frame"))

Upvotes: 0

Views: 1058

Answers (1)

twedl
twedl

Reputation: 1648

Tip 1: Use "pure" functions without side effects

In your example, you initialize return_df_2 and use the function to change it. This isn't really what sapply and the like are made for (and one reason it's doesn't parallelize well). Instead, try to make the function return the result you want, and put all the answers in a data.frame afterward. E.g.,

f <- function(x) {
  x / 10 # this returns a value instead of modifying something that already exists
}
result <- sapply(1:5, FUN = f)
data.frame(result)
##   result
## 1    0.1
## 2    0.2
## 3    0.3
## 4    0.4
## 5    0.5

Tip 2: parallelizing can be hard

Because the child processes don't always have access to things defined in the parent process. In this case, you get 'result_df_2' not found because the children weren't passed that. You can skip that error using the strategy above (but you might run into that problem in the future anyway if you have a more complicated function, so that's just FYI). Here's a simple example that uses parSapply:

library(parallel)
cl <- makeCluster(2)
result <- parSapply(cl, 1:5, f)
stopCluster(cl)

data.frame(result)
##   result
## 1    0.1
## 2    0.2
## 3    0.3
## 4    0.4
## 5    0.5

Possible solution for you (but hard to tell without the real data)

Instead of modifying the data.frame, have the function return temp.df and then use dplyr::bind_rows or the like to convert the returned list of dataframes into one single dataframe (or however you want the solution to look).

sliding_window <- function(sequence, time_row, Query, window_width) {
  sliding_window_1 <- Query[time_row <= (time_row[sequence] + window_width * 60 + 1 * 60) & 
                              time_row > time_row[sequence], ]
  if (nrow(sliding_window_1) >= 1) {
    temp.df <- data.frame(Start.time = sliding_window_1$TIME[1],
                          finish.time = sliding_window_1$TIME[nrow(sliding_window_1)],
                          max.value = max(sliding_window_1$C19.X.AAA.01, na.rm = T),
                          min.value = min(sliding_window_1$C19.X.AAA.01, na.rm = T))
  }
  else {
    temp.df <- data.frame(Start.time=as.POSIXct(character()), finish.time = as.POSIXct(character()), max.value = double(), min.value = double(), stringsAsFactors = FALSE)
  }
  temp.df
}

Ok, go for it. Good luck and godspeed.

Upvotes: 1

Related Questions