Reputation: 347
I know how to open a connection and read chunks of data with read.table [EDIT: fread does not allow connections], deleting some rows and collecting the resulting data sequentially in a list. But is there any other way it could be optimized so chunks could be read in fread and processed simultaneously?
I am using Windows.
So far from what I have gathered online I could split the large csv file I have into multiple smaller csv files using Cygwin -split- and then use parLapply to fread all of them.
Would you guys have a better idea?
Upvotes: 2
Views: 1613
Reputation: 4340
I like your solution and your timing tests, but I wish I understood the problem more clearly. Is the problem that you don't have enough memory to fread the entire file, or that you want to read and process the data faster by parallelizing?
If the issue is filesize > memory, but it would be possible to fit only the rows and columns that you want in memory, then I recommend using awk to make a smaller csv with only the rows and columns you want, then fread that. awk
processes row-by-row, so memory won't be an issue. Here is example awk code to skip blank lines and output columns 1, 2, and 4 to smaller.csv.
awk -F',' 'BEGIN{OFS=","}{if($1!="")print $1,$2,$4}' big.csv > smaller.csv
If the issue is speed, my guess is that the fastest option is to fread the file once and then parallelize processing afterwards using e.g., parLapply or simpler mclapply.
Upvotes: 2
Reputation: 347
Here is an attempt to parallelize fread calls to chunks of data. This solution has heavily drawn elements from
TryCatch with parLapply (Parallel package) in R
require(data.table)
require(dplyr)
require(parallel)
gc()
#=========================================================================
# generating test data
#=========================================================================
set.seed(1)
m <- matrix(rnorm(1e5),ncol=2)
csv <- data.frame(x=1:1e2,m)
names(csv) <- c(letters[1:3])
head(csv)
write.csv(csv,"test.csv")
#=========================================================================
# defining function to read chunks of data with fread: fread_by_chunks
#=========================================================================
fread_by_chunks <- function(filepath, counter, ChunkSize, ...) {
chunk <- as.character({(counter-1)/ChunkSize}+1)
print(paste0("Working on chunk ", chunk, "..."))
DT <- tryCatch(fread(filepath,
skip=counter,
nrows=ChunkSize,
...),
error=function(e) message(conditionMessage(e)))
# This condition checks that no errors occured
if(!class(DT)[1]=="data.table"){
DT <- data.table(cbind(chunk=chunk,is.empty="YES"))
# Just in case files are still empty even though no error
} else if(nrow(DT)==0){
DT <- data.table(cbind(chunk=chunk,is.empty="YES"))
# Apply filter here using column indexes DT[DT[[ncol]]] as columns are not named, automatic names (Vs) do not work.
} else {
DT[,chunk := chunk]
DT[,is.empty := "NO"]
}
return(DT)
}
#=========================================================================
# testing fread_by_chunks
#=========================================================================
ChunkSize = 1000
n_rows = 60000 # test.csv has 50e3 lines, we want to test if the code breaks with a call to nrows above that.
## In this step you have to make a guess as to how many rows there are in the dataset you are reading in. Guess a large number to make sure all the lines will be read. When the number of rows in your guess is above the actual number, the code will return a row with the field is.empty == "YES". You just have to delete these rows afterwards. If no such rows are there you cannot be sure you have read all the rows from the csv file.
counter <- c(0, seq(ChunkSize, n_rows, ChunkSize)) + 1
start_time <- Sys.time()
test <- lapply(counter, function(x) {fread_by_chunks(filepath = "test.csv", counter = x, ChunkSize = ChunkSize, header = F, fill = T, blank.lines.skip=T, select=c(1,2,4))})
Sys.time() - start_time
##Time difference of 0.2528741 secs
# binding chunks
test <- bind_rows(test)
#=========================================================================
# parallelizing fread_by_chunks
#=========================================================================
no_cores <- detectCores() - 1 # 3 cores, 2.8 Ghz
cl <- makeCluster(no_cores)
clusterExport(cl, c("data.table", "ChunkSize", "counter", "fread_by_chunks", "n_rows"))
clusterEvalQ(cl, library(data.table))
start_time <- Sys.time()
test <- parLapply(cl, counter, function(x) {fread_by_chunks(filepath = "test.csv", counter = x, ChunkSize = 1000, header = F, fill = T, blank.lines.skip=T, select=c(1,2,4))})
Sys.time() - start_time
##Time difference of 0.162251 secs
stopCluster(cl)
test <- bind_rows(test)
# just calling fread without blocks. It obviously takes a lot less time, but we have memory to fit all the data.
start_time <- Sys.time()
test <- fread("test.csv",
skip=0,
nrows=ChunkSize,
header=F,
fill = T,
blank.lines.skip=T,
select=c(1,2,4))
Sys.time() - start_time
#Time difference of 0.006005049 secs
Upvotes: 3