Reputation: 53
I have a custom function that goes through a time series data frame and returns a 30 minutes sliding window from the original time series. From this 30 minutes the function writes to another data frame the starting and finishing timestamps together with the minimum and maximum of this sliding window.
After that sapply statement is used to make this function recursive across the whole data range.
The sapply is too slow but works. I want to be able to parallelize the sapply, but when I do so code returns errors. I attribute this to the requirement of writing the final result of the function to the same data frame in parallel.
result_df_2 <- data.frame(Start.time=as.POSIXct(character()), finish.time = as.POSIXct(character()), max.value = double(), min.value = double(), stringsAsFactors = FALSE)
sliding_window <- function(sequence, time_row, Query, window_width) {
sliding_window_1 <- Query[time_row <= (time_row[sequence] + window_width * 60 + 1 * 60) &
time_row > time_row[sequence], ]
if (nrow(sliding_window_1) >= 1) {
temp.df <- data.frame(Start.time = sliding_window_1$TIME[1],
finish.time = sliding_window_1$TIME[nrow(sliding_window_1)],
max.value = max(sliding_window_1$C19.X.AAA.01, na.rm = T),
min.value = min(sliding_window_1$C19.X.AAA.01, na.rm = T))
result_df_2[nrow(result_df_2)+1,] <<- temp.df[1,]
}
}
sapply(1:(nrow(WBP) - 30), FUN = sliding_window, Query = WBP, time_row = WBP$TIME, window_width = 30)
An answer in this question Parallel while loop in R that mentioned such a scenario is possible to be parallelized. I need your help to know how.
Below is the output of dput(WBP[1:10,])
structure(list(TIME = structure(c(1484589600, 1484589660, 1484589720,
1484589780, 1484589840, 1484589900, 1484589960, 1484590020, 1484590080,
1484590140), class = c("POSIXct", "POSIXt"), tzone = ""), C19.X = c(216.193,
220.204, 218.845, 218.676, 219.194, 219.976, 219.894, 219.168,
216.713, 216.551), C19.N = c(214.201, 216.985, 218.15, 217.3,
218.11, 218.194, 218.332, 216.679, 215.343, 215.403), C19.X.AA.01 = c(216.193,
220.204, NA, NA, NA, NA, NA, NA, NA, 216.551), C19.X.AAA.01 = c(216.193,
220.204, 219.747375, 219.29075, 218.834125, 218.3775, 217.920875,
217.46425, 217.007625, 216.551)), .Names = c("TIME", "C19.X",
"C19.N", "C19.X.AA.01", "C19.X.AAA.01"), row.names = c(NA, 10L
), class = c("data.table", "data.frame"))
Upvotes: 0
Views: 1058
Reputation: 1648
In your example, you initialize return_df_2
and use the function to change it. This isn't really what sapply
and the like are made for (and one reason it's doesn't parallelize well). Instead, try to make the function return the result you want, and put all the answers in a data.frame afterward. E.g.,
f <- function(x) {
x / 10 # this returns a value instead of modifying something that already exists
}
result <- sapply(1:5, FUN = f)
data.frame(result)
## result
## 1 0.1
## 2 0.2
## 3 0.3
## 4 0.4
## 5 0.5
Because the child processes don't always have access to things defined in the parent process. In this case, you get 'result_df_2' not found
because the children weren't passed that. You can skip that error using the strategy above (but you might run into that problem in the future anyway if you have a more complicated function, so that's just FYI). Here's a simple example that uses parSapply
:
library(parallel)
cl <- makeCluster(2)
result <- parSapply(cl, 1:5, f)
stopCluster(cl)
data.frame(result)
## result
## 1 0.1
## 2 0.2
## 3 0.3
## 4 0.4
## 5 0.5
Instead of modifying the data.frame, have the function return temp.df
and then use dplyr::bind_rows
or the like to convert the returned list of dataframes into one single dataframe (or however you want the solution to look).
sliding_window <- function(sequence, time_row, Query, window_width) {
sliding_window_1 <- Query[time_row <= (time_row[sequence] + window_width * 60 + 1 * 60) &
time_row > time_row[sequence], ]
if (nrow(sliding_window_1) >= 1) {
temp.df <- data.frame(Start.time = sliding_window_1$TIME[1],
finish.time = sliding_window_1$TIME[nrow(sliding_window_1)],
max.value = max(sliding_window_1$C19.X.AAA.01, na.rm = T),
min.value = min(sliding_window_1$C19.X.AAA.01, na.rm = T))
}
else {
temp.df <- data.frame(Start.time=as.POSIXct(character()), finish.time = as.POSIXct(character()), max.value = double(), min.value = double(), stringsAsFactors = FALSE)
}
temp.df
}
Ok, go for it. Good luck and godspeed.
Upvotes: 1