Ryan Garnett
Ryan Garnett

Reputation: 261

Iterate over multiple files in data wrangling pipeline

I am working with a large number (2,000+) of large zip files (~300MB each) data from here. Each file has the same schema and structure. I am looking to iterate over a list of zip files and perform the following:

My approach was to pipe mapply into different data wrangling functions (i.e. clean_names and group_by) which is piped into write_dataset (see code below). However this approach is not performing the way I would like. It is unzipping all the files and importing them into a single csv first, then performing the data wrangling and write data. This would be fine on a few datasets, but when processing thousands memory will become an issue quickly, as each file has ~7 million rows.

zip_files <- dput(list.files("~/data/raw", pattern = ".zip", full.names = TRUE))

file_names <- dput(
  lapply(zip_files, unzip, list = TRUE) |>
    dplyr::bind_rows() |>
    dplyr::pull(Name)
)

mapply(unzip, zip_files, file_names) |>
  readr::read_csv() |>
  janitor::clean_names("all_caps") |>
  dplyr::group_by(YEAR = lubridate::year(BASE_DATE_TIME),
                  MONTH = lubridate::month(BASE_DATE_TIME),
                  DAY = lubridate::day(BASE_DATE_TIME)) |>
  arrow::write_dataset("~/data/parquet/ais_points",
                       format = "parquet",
                       max_rows_per_file = 1000000)

I am wondering what the best approach is to

Upvotes: 0

Views: 197

Answers (1)

margusl
margusl

Reputation: 17434

Iterating through a list of zip-files, reading and processing shifted from readr to arrow. This sets certain boundaries on available tools and libraries, but list of currently mapped functions is quite impressive - https://arrow.apache.org/docs/r/reference/acero.html

Tested with 4 files, resulting dataset had 30495179 rows.

library(arrow)
library(dplyr)
library(cli)
library(stringr)

ais_zips <- list.files("./", "zip$")

for (ais_zip in ais_zips){
  # date from filename
  ymd_lst <- str_extract_all(ais_zip, "\\d+")[[1]] %>% as.integer()
  cli_progress_step("unzip {ais_zip}")
  extr <- unzip(ais_zip)
  cli_progress_step("read_csv_arrow {extr}")
  # read as arrow table
  atbl <- arrow::read_csv_arrow(extr, as_data_frame = FALSE)
  cli_progress_step("process")
  atbl <- atbl %>% 
    mutate(year  = ymd_lst[1],
           month = ymd_lst[2],
           day   = ymd_lst[3]) %>% 
    rename_with(\(x) janitor::make_clean_names(x, "all_caps"))
    # ...
    # some additional wrangling
    # ...
  cli_progress_step("write_dataset")
  atbl %>% 
    group_by(YEAR, MONTH, DAY) %>% 
    write_dataset("./pq", format = "parquet")
  cli_progress_step("remove {extr}")
  file.remove(extr)
}
#> ✔ unzip AIS_2022_01_01.zip [11.9s]
#> ✔ read_csv_arrow ./AIS_2022_01_01.csv [12.2s]
#> ✔ process [706ms]
#> ✔ write_dataset [9.4s]
#> ✔ remove ./AIS_2022_01_01.csv [84ms]
#> ...
arrow::open_dataset("./pq") %>% glimpse()
#> FileSystemDataset with 4 Parquet files
#> 30,495,179 rows x 20 columns
#> $ MMSI                   <int64> 368084090, 368140160, 366941830, 316005971, 316…
#> $ BASE_DATE_TIME <timestamp[ms]> 2022-01-01 02:00:00, 2022-01-01 02:00:00, 2022-…
#> $ LAT                   <double> 29.93174, 30.33475, 29.30919, 46.50268, 46.5032…
#> $ LON                   <double> -89.99243, -87.14429, -94.79702, -84.35674, -84…
#> $ SOG                   <double> 6.0, 0.0, 0.0, 2.4, 0.3, 0.0, 0.0, 0.0, 4.7, 0.…
#> $ COG                   <double> 296.2, 312.0, 180.2, 258.6, 61.9, 215.3, 360.0,…
#> $ HEADING               <double> 299, 87, 511, 257, 511, 511, 511, 511, 511, 511…
#> $ VESSEL_NAME           <string> "LARRY B WHIPPLE", "TWISTED ANGEL", "SAN PATRIC…
#> $ IMO                   <string> NA, "IMO0000000", NA, "IMO9084047", "IMO8745333…
#> $ CALL_SIGN             <string> "WDK7401", "WDL5339", "WCX6675", "CFP2004", "VC…
#> $ VESSEL_TYPE            <int64> 57, 36, 31, 31, 31, 60, 36, 37, 36, 30, 60, 36,…
#> $ STATUS                 <int64> 12, NA, 5, 0, 0, 0, NA, NA, NA, NA, 0, NA, NA, …
#> $ LENGTH                 <int64> 23, 12, 18, 34, 24, 38, 13, 10, 10, 0, 35, 0, 0…
#> $ WIDTH                  <int64> 10, 7, 7, 10, 5, 13, 4, 4, 4, 0, NA, 0, 0, 6, 2…
#> $ DRAFT                 <double> 3.0, NA, NA, 5.3, 3.0, NA, NA, NA, NA, NA, NA, …
#> $ CARGO                  <int64> 57, NA, 57, 99, 50, 69, NA, NA, NA, NA, 31, NA,…
#> $ TRANSCEIVER_CLASS     <string> "A", "B", "A", "A", "A", "A", "B", "B", "B", "B…
#> $ YEAR                   <int32> 2022, 2022, 2022, 2022, 2022, 2022, 2022, 2022,…
#> $ MONTH                  <int32> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,…
#> $ DAY                    <int32> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,…

Created on 2023-06-01 with reprex v2.0.2

Upvotes: 1

Related Questions