LuminosityXVII
LuminosityXVII

Reputation: 133

Efficiently filter large dataset by time and location window repeatedly, relative to each point

I have a data set consisting of about 55 million records, split across many CSVs. Each record contains a datetime, a latitude, and a longitude, representing a lightning event and the location at which it occurred. I need to add a field, wherein for each event I need to list the time in seconds since the last event that occurred within 5 nautical miles of the current one. The code I've written below works, technically, but I had to stop it after it spent more than an hour on the first CSV.

Though I wrote this in R, I am willing to make use of any language or tool that can accomplish my objective without requiring days or weeks to process the data set. I've attempted SQLite as well, but so far on that front I've only frustrated myself in trying to get it to do the math at all.

Also note, during this search some entries will return NA due to there not being any previous events that meet the criteria. This is fine; I want NA in that case.

A sample of the CSV data follows the code, and I've made a file containing one day's data available for testing at: https://www.dropbox.com/s/i02jl6bswq0zlel/2019.05.02.TL.csv?dl=1

(I'm using taxicab distances for now in the code below, and the 0.08335 and 0.0965 values approximate 5 nmi in lat/long values based on the average latitude of all recorded events. If there's a method that will do Pythagorean distances quickly then that would be wonderful.)

Code:

library(dplyr)

#Get list of all relevant CSVs
fList = list.files(pattern = "*.TL.csv")

#Reads in the first CSV; haven't implemented looping through files yet 
t1 = read.csv(as.character(fList[1]), colClasses=c(DateTime="character"))
t1 = t1[order(t1$DateTime),]  #Sort by DateTime
rownames(t1) = NULL  #Reset index

#Used for filtering the current row out of the search
index = c(1:nrow(t1))

#Create empty column
t1$TimeGap = NA

for (i in 2:nrow(t1)) {
  temp = max(filter(t1, (DateTime <= t1$DateTime[i]) &
                        (Latitude >= t1$Latitude[i]-0.08335) &
                        (Latitude <= t1$Latitude[i]+0.08335) &
                        (Longitude >= t1$Longitude[i]-0.0965) &
                        (Longitude <= t1$Longitude[i]+0.0965) &
                        (index != i))$DateTime)
  t1$TimeGap[i] = as.numeric(difftime(as.POSIXct(t1$DateTime[i],format="%Y-%m-%d %H:%M:%OS"),
                                      as.POSIXct(temp,format="%Y-%m-%d %H:%M:%OS"),
                                      units="secs"))
}

CSV data sample:

"DateTime","Latitude","Longitude"
"2019-05-02 14:43:37.833",26.9517,-81.4851
"2019-05-02 14:43:37.857",26.9674,-81.4758
"2019-05-02 14:43:37.988",26.9802,-81.4698
"2019-05-02 14:45:41.512",27.0024,-81.4612
"2019-05-02 15:22:59.614",27.295,-81.1728
"2019-05-02 15:24:06.284",27.3444,-81.1213
"2019-05-02 15:24:51.607",27.3306,-81.146
"2019-05-02 15:26:52.130",27.5441,-81.1099
"2019-05-02 15:26:52.131",27.3214,-81.1758
"2019-05-02 15:26:52.131",27.3326,-81.1614
"2019-05-02 15:26:52.134",27.5396,-81.0952
"2019-05-02 15:26:52.134",27.5377,-81.1069
"2019-05-02 15:26:52.156",27.517,-81.1147
"2019-05-02 15:26:52.167",27.5377,-81.0962
"2019-05-02 15:28:59.356",27.5156,-81.1152
"2019-05-02 15:28:59.357",27.519,-81.1092
"2019-05-02 15:28:59.359",27.406,-81.174
"2019-05-02 15:28:59.362",27.4081,-81.1489
"2019-05-02 15:28:59.362",27.508,-81.1472
"2019-05-02 15:28:59.364",27.5183,-81.1497
"2019-05-02 15:28:59.417",27.5338,-81.1712
"2019-05-02 15:31:39.021",27.4052,-81.1956
"2019-05-02 15:31:39.027",27.4381,-81.1837
"2019-05-02 15:31:39.027",27.5141,-81.159
"2019-05-02 15:31:39.027",27.417,-81.1631
"2019-05-02 15:31:39.027",27.5439,-81.1326
"2019-05-02 15:31:39.048",27.4809,-81.1691
"2019-05-02 15:31:39.048",27.4666,-81.1561
"2019-05-02 15:31:39.048",27.4666,-81.1561
"2019-05-02 15:31:39.048",27.401,-81.1679

Upvotes: 0

Views: 112

Answers (1)

Oliver
Oliver

Reputation: 8572

This sounds like a situation where the (relatively new) package disk.frame will excel. It is similar to the data.table but with mind on very large amounts of data. Without having used it, but with my knowledge from data.table, and assuming that all your data is in file.path(getwd(), 'data'), you could likely achieve your goal using something like

library(disk.frame)
# Setup from the ingesting Data vignette
if(interactive()) { 
  setup_disk.frame()
  # highly recommended, however it is pun into interactive() for CRAN because
  # change user options are not allowed on CRAN
  options(future.globals.maxSize = Inf)  
} else {
  setup_disk.frame(2)
}
# Import csv data from shared folder
disk.f <- csv_to_disk.frame(list.files(file.path(getwd(), 'data'), full.names = TRUE))

Once imported we should be able to do calculations in the standard data.table syntax

disk.f[, `:=`(LatitudeMax = Latitude + 0.08335,
              LatitudeMin = Latitude - 0.08335,
              LongitudeMax = Longitude + 0.0965, 
              LongitudeMin = Longitude - 0.0965,
              rowid = seq_len(.N),
              Datetime = as.POSIXct(Datetime, format = "%Y-%m-%d %H:%M:%OS"))]
# Perform selv join.. I hope I got the direction right here.
calc.disk.f <- 
  disk.f[disk.f[, .(Latitude.i = Latitude, 
                    Longitude.i = Longitute, 
                    Datetime.i = Datetime, 
                    rowid.i = rowid)], 
         on = .(Datetime >= Datetime.i,
                LongitudeMax >= Longitude.i,
                LongitudeMin <= Longitde.i, 
                LatitudeMax >= Latitude.i,
                LatitudeMin <= Latitude.i,
                rowid != rowid.i), 
         allow.cartesian = TRUE][, .(Timegap = difftime(Datetime, 
                                                        max(Datetime.i), 
                                                        units = 'secs')), 
                                 by = rowid]
# Merge result into original frame.
disk.f <- calc_disk_f[disk.f, on = rowid]
# clean up calc.disk.f
delete(calc.disk.f)

# Save result or change to a regular data.table/data.frame/tibble
## Not sure if this is in csv, fst or what format.
write_disk.frame(disk.f, outdir = file.path(getwd(), data, 'result.df')) 

Disclaimer!

I have not been able to test the above code, so it is likely riddled with typos and possibly a wrong-sided join (Should my aggregation group by rowid or rowid.i? I might have turned it one way or the other), and a self-join that has not been tested in disk.frame (maybe creating a copy of the disk.frame is necessary). I might create a more reproducible example, but more likely this can be used as a starting point and give ground for a proper solution.

Note disk.frame similarly has a dplyr like interface, which could be used in case that the data.table self-join syntax doesn't work out.

Update 1:

It seems that the data.table syntax for disk.frame is a partial wrapper into the data.table universe, as such the join I suggested does not work as expected. Using foverlaps is currently my second-option for a fast subsetting method.

Upvotes: 1

Related Questions