Reputation: 411
I have a data set with ~4 million rows that I need to loop over. The data structure is there are repeated IDs that are dependent on each other but data is independent across IDs. For each ID, the [i+1] row is a dependent on [i]. Here is a reproducible example. I do realize that this example is not practical in terms of the inner functions but it is simply a demonstration of the structure I have.
set.seed(123)
id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)
month = rep(seq(1,5),3)
x = round(rnorm(15,2,5))
y = rep(0,15)
df = as.data.frame(cbind(ids,month,x,y))
for (i in 1:nrow(df)){
if(i>1 && df[i,1]==df[i-1,1]){
#Main functions go here
df[i,4] = df[i-1,4]^2+df[i,3]
}
else {
df[i,4] = 1
}
}
The issue is in reality 1000 loops of the real function takes ~90 seconds, so 4 million rows takes days. It isn't feasible for me to run this way. However the IDs are independent and don't need to run together. My question is: is there a way to run this type of loop in parallel? A very non-elegant solution would be to split the file into 50 sections without splitting an ID and simply run the same code on the 50 sub-files. I figure there should be a way to code this though.
EDIT: Added month column to show why the rows are dependent on each other. To address two comments below:
1) There are actually 6-7 lines of functions to run. Could I use ifelse() with multiple functions?
2) The desired output would be the full data frame. In reality there are more columns but I need each row in a data frame.
ids month x y
1 1 1 -1 1
2 1 2 1 2
3 1 3 10 14
4 1 4 2 198
5 1 5 3 39207
6 2 1 11 1
7 2 2 4 5
8 2 3 -4 21
9 2 4 -1 440
10 2 5 0 193600
11 3 1 8 1
12 3 2 4 5
13 3 3 4 29
14 3 4 3 844
15 3 5 -1 712335
EDIT2: I've tried applying the foreach() package from another post but it doesn't seem to work. This code will run but I think the issue is the way that rows are distributed among cores. If each row is sequentially sent to a different core then the same ID will never be in the same core.
library(foreach)
library(doParallel)
set.seed(123)
id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)
month = rep(seq(1,5),3)
x = round(rnorm(15,2,5))
y = rep(0,15)
df = as.data.frame(cbind(ids,month,x,y))
#setup parallel backend to use many processors
cores=detectCores()
cl <- makeCluster(cores[1]-1) #not to overload your computer
registerDoParallel(cl)
finalMatrix <- foreach(i=1:nrow(df), .combine=cbind) %dopar% {
for (i in 1:nrow(df)){
if(i>1 && df[i,1]==df[i-1,1]){
#Main functions go here
df[i,4] = df[i-1,4]^2+df[i,3]
}
else {
df[i,4] = 1
}
}
}
#stop cluster
stopCluster(cl)
Upvotes: 3
Views: 803
Reputation: 4242
melt
/dcast
from data.table
As discussed in the comments above, this solution is very specific to the use case in the example, but perhaps might be applicable to your use case.
Using matrix operations and the dcast.data.table
and melt.data.table
functions from the data.table
package to make fast transitions from a long to wide format and back is pretty efficient.
All things considered, the bigger constraint will likely how much RAM you have available than processing time with these methods.
library(data.table)
set.seed(123)
id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)
month = rep(seq(1,5),3)
x = round(rnorm(15,2,5))
# y = rep(0,15) ## no need to pre-define y with this method
df = as.data.frame(cbind(ids,month,x))
setDT(df) ## Convert to data.table by reference
wide <- dcast.data.table(df, month ~ ids, value.var = "x") ## pivot to 'wide' format
mat <- data.matrix(wide[,-c("month")]) ## Convert to matrix
print(mat)
gives
1 2 3
[1,] -1 11 8
[2,] 1 4 4
[3,] 10 -4 4
[4,] 2 -1 3
[5,] 3 0 -1
Then operating on it as a matrix:
mat[1,] <- 1 ## fill the first row with 1's as in your example
for (i in 2:nrow(mat)){
mat[i,] = mat[i-1L,]^2 + mat[i,]
}
print(mat)
gives
1 2 3
[1,] 1 1 1
[2,] 2 5 5
[3,] 14 21 29
[4,] 198 440 844
[5,] 39207 193600 712335
Next, melt back to a long format and then join back to the original data on key columns ids
and month
:
yresult <- as.data.table(mat) ## convert back to data.table format
yresult[,month := wide[,month]] ## Add back the month column
ylong <- melt.data.table(yresult,
id.vars = "month",
variable.factor = FALSE,
variable.name = "ids",
value.name = "y") ## Pivot back to 'long' format
ylong[,ids := as.numeric(ids)] ## reclass ids to match input ids
setkey(ylong, ids, month) ## set keys for join on 'ids' and 'month'
setkey(df, ids,month)
merge(df,ylong) ## join data.table with the result
yields the final result:
ids month x y
1: 1 1 -1 1
2: 1 2 1 2
3: 1 3 10 14
4: 1 4 2 198
5: 1 5 3 39207
6: 2 1 11 1
7: 2 2 4 5
8: 2 3 -4 21
9: 2 4 -1 440
10: 2 5 0 193600
11: 3 1 8 1
12: 3 2 4 5
13: 3 3 4 29
14: 3 4 3 844
15: 3 5 -1 712335
To test and illustrate scaling, the function testData
below generates a data set by cross joining a given number of ids and a given number of months. Then, the function testFunc
performs the recursive row-wise matrix operations.
testData <- function(id_count, month_count) {
id_vector <- as.numeric(seq_len(id_count))
months_vector <- seq_len(month_count)
df <- CJ(ids = id_vector,month = months_vector)
df[,x := rnorm(.N,0,0.1)]
return(df)
}
testFunc <- function(df) {
wide <- dcast.data.table(df,month ~ ids, value.var = "x")
mat <- data.matrix(wide[,-c("month")])
mat[1,] <- 1
for (i in 2:nrow(mat)){
mat[i,] = mat[i-1L,]^2 + mat[i,]
}
yresult <- as.data.table(mat)
yresult[,month := wide[,month]]
ylong <- melt.data.table(yresult,
id.vars = "month",
variable.factor = FALSE,
variable.name = "ids",
value.name = "y")
ylong[,ids := as.numeric(ids)]
setkey(ylong, ids, month)
setkey(df, ids,month)
merge(df,ylong)
}
ids
and 45 months
:foo <- testData(90000,45)
system.time({
testFunc(foo)
})
user system elapsed
8.186 0.013 8.201
Run-time comes in under 10 seconds with a single thread.
ids
and 1,000 months
:This three column input data.table is ~1.9GB
foo <- testData(1e5,1e3)
system.time({
testFunc(foo)
})
user system elapsed
52.790 4.046 57.031
A single threaded run-time of less than a minute seems pretty manageable depending on how many times this needs to be run. As always, this could be sped up further by improvements to my code or converting the recursive portion to C++
using Rcpp
, but avoiding the mental overhead of learning C++
and switching between languages in your workflow is always nice!
Upvotes: 1
Reputation: 11728
So, simply recode your loop with Rcpp:
#include <Rcpp.h>
using namespace Rcpp;
// [[Rcpp::export]]
NumericVector fill_y(const NumericVector& x) {
int n = x.length();
NumericVector y(n); y[0] = 1;
for (int i = 1; i < n; i++) {
y[i] = pow(y[i - 1], 2) + x[i];
}
return y;
}
And, to apply it on each group, use dplyr:
df %>%
group_by(ids) %>%
mutate(y2 = fill_y(x))
I think this should be fast enough so that you don't need parallelism.
Actually I ran it on @Val's testdat
and it took only 2 seconds (with an old computer).
Tell me if it's okay. Otherwise, I'll make a parallel version.
Upvotes: 3
Reputation: 7023
Here's a solution using foreach
. Hard to say how it would work in your real life example, at least it works with the testdata ...
First I generate some testdata:
# function to generate testdata
genDat <- function(id){
# observations per id, fixed or random
n <- 50
#n <- round(runif(1,5,1000))
return(
data.frame(id=id,month=rep(1:12,ceiling(n/12))[1:n],x=round(rnorm(n,2,5)),y=rep(0,n))
)
}
#generate testdata
testdat <- do.call(rbind,lapply(1:90000,genDat))
> head(testdat)
id month x y
1 1 1 7 0
2 1 2 6 0
3 1 3 -9 0
4 1 4 3 0
5 1 5 -9 0
6 1 6 8 0
> str(testdat)
'data.frame': 4500000 obs. of 4 variables:
$ id : int 1 1 1 1 1 1 1 1 1 1 ...
$ month: int 1 2 3 4 5 6 7 8 9 10 ...
$ x : num 7 6 -9 3 -9 8 -4 13 0 5 ...
$ y : num 0 0 0 0 0 0 0 0 0 0 ...
So the testdata has ~ 4.5 million rows with 90k unique ids.
Now since your calculations are independent between the IDs, the idea would be to ship off data with unique IDs to each core ... this would ultimately also get rid of the necessity for an if
or ifelse
condition.
To do this, I first generate a matrix with start and stop row indices, to split the dataset in unique IDs:
id_len <- rle(testdat$id)
ixmat <- cbind(c(1,head(cumsum(id_len$lengths)+1,-1)),cumsum(id_len$lengths))
This matrix can then be passed on to foreach
for running the specific parts in parallel.
In this example I modify your calculations slightly to avoid astronomical values leading to Inf
.
library(parallel)
library(doParallel)
library(iterators)
cl <- makeCluster(parallel::detectCores())
registerDoParallel(cl) #create a cluster
r <- foreach (i = iter(ixmat,by='row')) %dopar% {
x <- testdat$x[i[1,1]:i[1,2]]
y <- testdat$y[i[1,1]:i[1,2]]
y[1] <- 1
for(j in 2:length(y)){
#y[j] <- (y[j-1]^2) + x[j] ##gets INF
y[j] <- y[j-1] + x[j]
}
return(y)
}
parallel::stopCluster(cl)
Finally you could replace the values in the original dataframe:
testdat$y <- unlist(r)
As for the time, the foreach loop runs in about 40 seconds on my 8 core machine.
Upvotes: 1