Reputation: 29
I'm doing SQL style join of two large, fixed size (lat,long) coordinate datasets by nearest neighbor search. Currently I'm using dplyr and data.table to do this. How can I optimize and parallelize my code for absolute runtime?
Previous attempts include native python, pandas, and multiprocessing which ended up being very slow. My current solution, using data.table to construct a table of nearest neighbors and dplyr to join based on this table, is the quickest but is still too slow.
library(dplyr)
library(data.table)
library(geosphere)
source <- data.table(lat = runif(1e3), long = runif(1e3)) %>% mutate(nrow = row_number())
dest <- data.table(lat = runif(5e4), long = runif(5e4)) %>% mutate(ind = row_number())
dest_mat <- as.matrix(dest[, c('long', 'lat')])
setDT(source)
# function that returns the index of the closest matching point in dest
mindist_ind <- function(source_lat, source_long) { return(which.min(distHaversine(c(source_long, source_lat), dest_mat))) }
nn_inds <- source[, j = list(ind = mindist_ind(lat, long)), by = 1:nrow(source)] # slowest line, gets index of nearest match in dest
nn_matches <- inner_join(nn_inds, dest, by = 'ind') # join final back to dest in order to get all nearest matches
sourcedest_matches <- inner_join(source, nn_matches, by = 'nrow') # join nearest matches to source by index
The source file is ~89 million rows, and dest is roughly ~50k rows. Current timing for various source sizes are as follows:
While this is the quickest I've been able to get, for the full 89 million source file it would take an estimated 17-18 days to run which is far too long. I'm running this on a r4.16xlarge AWS EC2 instance, with 488 GB RAM, 32 cores, and 64 vCPUs. How can I optimize/parallelize this code to run quicker?
Upvotes: 1
Views: 228
Reputation: 5898
I'm assuming the code you've provided in your question isn't actually what you want. Your code calculates the distance between pairwise rows of source
and dest
, recycling source
to match the length of dest
.
What you probably want, and what this answer provides, is to find the nearest point in dest
for every point in source
. (see my comments on your question)
Calculating distance matrices is computationally intensive. Assuming R packages are approximately equally efficient at calculating distance matrices, really the only way to speed this up is to parallelize over the distance matrix calculation. It's unfortunate that the matrix with more rows is the reference points, because parallelization can only occur over subsets of the source points. (ie, you need to consider all dest
points to find the nearest dest
point to any given source
)
library(parallel)
library(sp)
#nonparallel version
x2 <- copy(source)
temp <- spDists(x2[, .(long,lat)],dest_mat,longlat=TRUE)
system.time(final2 <- x2[, c("long_dest","lat_dest"):=as.list(dest_mat[apply(temp,1,which.min),]) ])
#parallel version
source_l <- split(source, rep(1:10,each=100))
cl <- makeCluster(getOption("cl.cores", 4))
clusterExport(cl, "dest_mat") #this is slow but I don't think there's a way to get around this
system.time(
v <- parLapply(cl=cl, X=source_l, fun=function(x){
library(sp)
library(data.table)
temp <- spDists(x[, .(long,lat)],dest_mat,longlat=TRUE)
x[, c("long_dest","lat_dest"):=as.list(dest_mat[apply(temp,1,which.min),]) ]
x
})
)
stopCluster(cl)
final <- rbindlist(v)
identical(final[order(nrow)],final2)
You'll need to play around with whether using more than 32 processes actually speeds things up. Hyperthreading can be a mixed bag and it's not always easy to predict whether it has any benefit. Unfortunately there's no guarantee you'll have enough RAM to run the optimal number of processes. Not only is this slow but it's also memory intensive. If you get errors indicating that you've run out of memory you'll need to decrease the number of processes or rent an EC2 machine with more memory.
Finally, I'll note that which.min returns the index of the first minima if there are ties. So the results will depend on the order of rows in dest_mat.
Upvotes: 1