Reputation: 33
I am reading a CSV file and loading it into an Avro file in the GCS bucket. The Avro file gets created but there is no data. There is data when I print. I checked the buffer but there is no data in the buffer as well.
I tried writer.close() but I am getting this error - "Cannot flush without finalizing upload. Use close() instead, "io.UnsupportedOperation: Cannot flush without finalizing upload. Use close() instead."
'def load_avro_file(records):
schema_parsed = avro.schema.parse(json.dumps(schema))
client = storage.Client()
bucket = client.get_bucket(BUCKET)
blob = bucket.blob(DESTINATION_FILE)
with blob.open(mode='wb') as f:
writer = DataFileWriter(f, DatumWriter(), schema_parsed)
for record in records:
record = dict((f, getattr(record, f)) for f in record._fields)
print("In here",record)
writer.append(record)
'
Upvotes: 3
Views: 826
Reputation: 1057
I was facing a similar problem but couldn't find any answer for this. Maybe you already solved this, but let me share here how I had this working.
Reading Google Cloud docs for blob.open
method, I found this ignore_flush
parameter:
(Optional) For non text-mode writes, makes flush() do nothing instead of raising an error. flush() without closing is not supported by the remote service and therefore calling it normally results in io.UnsupportedOperation. However, that behavior is incompatible with some consumers and wrappers of file objects in Python, such as zipfile.ZipFile or io.TextIOWrapper. Setting ignore_flush will cause flush() to successfully do nothing, for compatibility with those contexts. The correct way to actually flush data to the remote server is to close() (using a context manager, such as in the example, will cause this to happen automatically).
Avro needs to open the files on binary mode, so when opening the blob we need to set this parameter to True
to avoid errors.
Also, if you don't call the .close()
avro method the file won't be generated properly, so we need to feed our IO object to the writer without wrapping it on a context manager as it will be handled by avro itself.
The final solution looks like this:
import google.cloud.storage as gcs
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
gcs_client = gcs.Client()
bucket = gcs_client.bucket(bucketname)
blob = bucket.blob(filename)
writer = DataFileWriter(blob.open('wb', ignore_flush=True), DatumWriter(), schema_parsed)
for record in records:
writer.append(record)
writer.close()
Upvotes: 0