d_rapa
d_rapa

Reputation: 13

Parallelization with data.table

I have the following problem. I have a piece-wise linear function described by (xPoints, yPoints) and want to compute fast--I have to do it over and over again--the implied y-value for a long list of x's, where x could fall outside the range of xPoints. I have coded a function f_pwl that computes the implied y-value, but it is slow, so I was trying to parallelize its call. But it is actually slower than using data.table := syntax. I will appreciate suggestions to speed things up either by improving my f_pwl function, or by implementing an efficient parallelization, as I have access to 20 cores to speed things up.

Here is a sample code.

    # libraries
    require(data.table) # for fread, work with large data
    require(abind)      # for abind()
    require(foreach) # for parallel processing, used with doParallel
    require(doParallel) # for parallel processing, used with foreach

    f_pwl <- function(x)  {
      temp <- as.vector( rep(NA, length = length(x)), mode = "double" )
      for (i in seq(from = 1, to = length(x), by = 1))  {
        if (x[i] > max(xPoints) | x[i] < min(xPoints)) {
          # nothing to do, temp[i] <- NA
        } else if (x[i] == max(xPoints)) {
          # value equal max(yPoints)
          temp[i] <- max(yPoints)
        } else {
          # value is f_pwl(x)
          xIndexVector = as.logical( x[i] >= xPoints  &  abind(xPoints[2:length(xPoints)], max(xPoints)) > x[i] )
          xIndexVector_plus1 = shift( xIndexVector, n = 1, fill = FALSE, type = "lag" )
          alpha_j = (xPoints[xIndexVector_plus1] - x[i])/(xPoints[xIndexVector_plus1] - xPoints[xIndexVector])
          temp[i] <- alpha_j %*% yPoints[xIndexVector] + (1-alpha_j) %*% yPoints[xIndexVector_plus1]
        }
      } # end for i
      as.vector( temp, mode = "double" )
    }


    ## Main program
    xPoints <- c(4, 9, 12, 15, 18, 21)
    yPoints <- c(1, 2, 3, 4, 5, 6)

    x <- rnorm(1e4, mean = 12, sd = 5)

    dt <- as.data.table( x )
    dt[ , c("y1", "y2", "y3") := as.vector( mode = "double", NA ) ]

    # data.table := command
    system.time({
      dt[, y2 := f_pwl( x ) ]
    })

    # mapply
    system.time({
      dt[ , y1 := mapply( f_pwl, x ), by=.I  ]
    })

    # parallel
    system.time({
      #setup parallel backend to use many processors
      cores=detectCores()
      cl <- makeCluster(cores[1]-1, type="FORK") #not to overload your computer
      registerDoParallel(cl)
      dt$y3 <- foreach(i=1:nrow(dt), .combine=cbind) %dopar% {
        tempY <- f_pwl( dt$x[i] )
        tempY
      }
      #stop cluster
      stopCluster(cl)
    })

    summary( dt[ , .(y1-y2, y1-y3, y2-y3)] )   

Upvotes: 1

Views: 1454

Answers (1)

chinsoon12
chinsoon12

Reputation: 25225

First, calculate and store the alpha_j's.

Then, sort DT by x first and cut it into the relevant intervals before performing your linear interpolation

alpha <- c(NA, diff(yPoints) / diff(xPoints))

DT[order(x), 
    y := alpha[.GRP] * (x - xPoints[.GRP-1L]) + yPoints[.GRP-1L], 
    by=cut(x, xPoints)]

Please let me know how it performs.

data:

library(data.table)

## Main program
set.seed(27L)
xPoints <- c(4, 9, 12, 15, 18, 21)
yPoints <- c(1, 2, 3, 4, 5, 6)
DT <- data.table(x=rnorm(1e4, mean=12, sd=5))

check:

f_pwl <- function(x)  {
    temp <- as.vector( rep(NA, length = length(x)), mode = "double" )
    for (i in seq(from = 1, to = length(x), by = 1))  {
        if (x[i] > max(xPoints) | x[i] < min(xPoints)) {
            # nothing to do, temp[i] <- NA
        } else if (x[i] == max(xPoints)) {
            # value equal max(yPoints)
            temp[i] <- max(yPoints)
        } else {
            # value is f_pwl(x)
            xIndexVector = as.logical( x[i] >= xPoints  &  abind(xPoints[2:length(xPoints)], max(xPoints)) > x[i] )
            xIndexVector_plus1 = shift( xIndexVector, n = 1, fill = FALSE, type = "lag" )
            alpha_j = (xPoints[xIndexVector_plus1] - x[i])/(xPoints[xIndexVector_plus1] - xPoints[xIndexVector])
            temp[i] <- alpha_j %*% yPoints[xIndexVector] + (1-alpha_j) %*% yPoints[xIndexVector_plus1]
        }
    } # end for i
    as.vector( temp, mode = "double" )
}
system.time({
    DT[, yOP := f_pwl( x ) ]
})

DT[abs(y-yOP) > 1e-6]
#Empty data.table (0 rows) of 3 cols: x,y,yOP

Upvotes: 2

Related Questions