sfinnie
sfinnie

Reputation: 9952

Stream processing large csv file in R

I need to make a couple of relatively simple changes to a very large csv file (c.8.5GB). I tried initially using various reader functions: read.csv, readr::read.csv, data.table::fread. However: they all run out of memory.

I'm thinking I need to use a stream processing approach instead; read a chunk, update it, write it, repeat. I found this answer which is on the right lines; however I don't how to terminate the loop (I'm relatively new to R).

So I have 2 questions:

  1. What's the right way to make the while loop work?
  2. Is there a better way (for some definition of 'better')? e.g. is there some way to do this using dplyr & pipes?

Current code as follows:

src_fname <- "testdata/model_input.csv"
tgt_fname <- "testdata/model_output.csv"

#Changes needed in file: rebase identifiers, set another col to constant value
rebase_data <- function(data, offset) {
  data$'Unique Member ID' <- data$'Unique Member ID' - offset
  data$'Client Name' <- "TestClient2"
  return(data)
}

CHUNK_SIZE <- 1000
src_conn = file(src_fname, "r")
data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE)
cols <- colnames(data)
offset <- data$'Unique Member ID'[1] - 1

data <- rebase_data(data, offset)
#1st time through, write the headers
tgt_conn = file(tgt_fname, "w")
write.csv(data,tgt_conn, row.names=FALSE)

#loop over remaining data
end = FALSE
while(end == FALSE) {
  data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE, col.names = cols)
  data <- rebase_data(data, offset)
  #write.csv doesn't support col.names=FALSE; so use write.table which does
  write.table(data, tgt_conn, row.names=FALSE, col.names=FALSE, sep=",")
  # ??? How to test for EOF and set end = TRUE if so  ???
  # This doesn't work, presumably because nrow() != CHUNK_SIZE on final loop?
  if (nrow(data) < CHUNK_SIZE) {
    end <- TRUE
  }

}
close(src_conn)
close(tgt_conn)

Thanks for any pointers.

Upvotes: 7

Views: 3289

Answers (3)

Quar
Quar

Reputation: 1082

Sorry to poke a 2-year-old thread, but now with readr::read_csv_chunked (auto-loaded along with dplyr when loading tidyverse), we could also do like:

require(tidyverse)

## For non-exploratory code, as @antoine-sac suggested, use:
# require(readr)  # for function `read_csv_chunked` and `read_csv`
# require(dplyr)  # for the pipe `%>%` thus less parentheses

src_fname = "testdata/model_input.csv"
tgt_fname = "testdata/model_output.csv"

CHUNK_SIZE = 1000

offset = read_csv(src_fname, n_max=1)$comm_code %>% as.numeric() - 1 

rebase.chunk = function(df, pos) {
  df$comm_code = df$comm_code %>% as.numeric() - offset
  df$'Client Name' = "TestClient2"
  is.append = ifelse(pos > 1, T, F)
  df %>% write_csv(
    tgt_fname,
    append=is.append
  )
}

read_csv_chunked(
  src_fname, 
  callback=SideEffectChunkCallback$new(rebase.chunk), 
  chunk_size = chunck.size,
  progress = T    # optional, show progress bar
)

Here the tricky part is to set is.append based on parameter pos, which indicates the start row number of the data frame df within original file. Within readr::write_csv, when append=F the header (columns name) will be written to file, otherwise not.

Upvotes: 4

asachet
asachet

Reputation: 6921

Try this out:

library("chunked")

read_chunkwise(src_fname, chunk_size=CHUNK_SIZE) %>%
rebase_data(offset) %>%
write_chunkwise(tgt_fname)

You may need to fiddle a bit with the colnames to get exactly what you want.

(Disclaimer: haven't tried the code)

Note that there is no vignette with the package but the standard usage is described on github: https://github.com/edwindj/chunked/

Upvotes: 2

sfinnie
sfinnie

Reputation: 9952

OK I found a solution, as follows:

# src_fname <- "testdata/model_input.csv"
# tgt_fname <- "testdata/model_output.csv"

CHUNK_SIZE <- 20000

#Changes needed in file: rebase identifiers, set another col to constant value
rebase_data <- function(data, offset) {
  data$'Unique Member ID' <- data$'Unique Member ID' - offset
  data$'Client Name' <- "TestClient2"
  return(data)
}

#--------------------------------------------------------
# Get the structure first to speed things up
#--------------------------------------------------------
structure <- read.csv(src_fname, nrows = 2, check.names = FALSE)
cols <- colnames(structure)
offset <- structure$'Unique Member ID'[1] - 1

#Open the input & output files for reading & writing
src_conn = file(src_fname, "r")
tgt_conn = file(tgt_fname, "w")

lines_read <- 0
end <- FALSE
read_header <- TRUE
write_header <- TRUE
while(end == FALSE) {
  data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE, col.names = cols, header = read_header)
  if (nrow(data) > 0) {
    lines_read <- lines_read + nrow(data)
    print(paste0("lines read this chunk: ", nrow(data), ", lines read so far: ", lines_read))
    data <- rebase_data(data, offset)
    #write.csv doesn't support col.names=FALSE; so use write.table which does
    write.table(data, tgt_conn, row.names=FALSE, col.names=write_header, sep = ",")
  }
  if (nrow(data) < CHUNK_SIZE) {
    end <- TRUE
  }
  read_header <- FALSE
  write_header <- FALSE
}
close(src_conn)
close(tgt_conn)

Upvotes: 0

Related Questions