Reputation:
I have a variety of very large (~4GB each) csv files that contain different formats. These come from data recorders from over 10 different manufacturers. I am attempting to consolidate all of these into BigQuery. In order to load these up on a daily basis I want to first load these files into Cloud Storage, determine the schema, and then load into BigQuery. Due to the fact that some of the files have additional header information (from 2 - ~30 lines) I have produced my own functions to determine the most likely header row and the schema from a sample of each file (~100 lines), which I can then use in the job_config when loading the files to BQ.
This works fine when I am working with files from local storage direct to BQ as I can use a context manager and then Python's csv module, specifically the Sniffer and reader objects. However, there does not seem to be an equivalent method of using a context manager direct from Storage. I do not want to bypass Cloud Storage in case any of these files are interrupted when loading into BQ.
What I can get to work:
# initialise variables
with open(csv_file, newline = '', encoding=encoding) as datafile:
dialect = csv.Sniffer().sniff(datafile.read(chunk_size))
reader = csv.reader(datafile, dialect)
sample_rows = []
row_num = 0
for row in reader:
sample_rows.append(row)
row_num+=1
if (row_num >100):
break
sample_rows
# Carry out schema and header investigation...
With Google Cloud Storage I have attempted to use download_as_string and download_to_file, which provide binary object representations of the data, but then I cannot get the csv module to work with any of the data. I have attempted to use .decode('utf-8') and it returns a looong string with \r\n's. I then used splitlines() to get a list of the data but still the csv functions keep giving a dialect and reader that splits the data into single characters as each entry.
Has anyone managed to get a work around to use the csv module with files stored in Cloud Storage without downloading the whole file?
Upvotes: 2
Views: 2076
Reputation:
After having a look at the csv source code on GitHub, I have managed to use the io module and csv module in Python to solve this problem. The io.BytesIO and TextIOWrapper were the two key functions to use. Probably not a common use case but thought I would post the answer here to save some time for anyone that needs it.
# Set up storage client and create a blob object from csv file that you are trying to read from GCS.
content = blob.download_as_string(start = 0, end = 10240) # Read a chunk of bytes that will include all header data and the recorded data itself.
bytes_buffer = io.BytesIO(content)
wrapped_text = io.TextIOWrapper(bytes_buffer, encoding = encoding, newline = newline)
dialect = csv.Sniffer().sniff(wrapped_text.read())
wrapped_text.seek(0)
reader = csv.reader(wrapped_text, dialect)
# Do what you will with the reader object
Upvotes: 7