Reputation: 45
I am reading data in chunks using pandas.read_sql and appending to parquet file but get errors
Using pyarrow.parquet:
import pyarrow as pa
import pyarrow.parquet as pq
for chunk in pd.read_sql_query(query , conn, chunksize=10000):
table_data = pa.Table.from_pandas(chunk) #converting df to arrow
pq.write_table(table=table_data,where=file.paraquet,
use_deprecated_int96_timestamps=True,
coerce_timestamps='ms',allow_truncated_timestamps=True)
Getting the following error:
File "pyarrow\_parquet.pyx", line 1427, in pyarrow._parquet.ParquetWriter.__cinit__
File "pyarrow\error.pxi", line 120, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Unhandled type for Arrow to Parquet schema conversion: duration[ns]
using fast parquet:
from fastparquet import write
for chunk in pd.read_sql_query(query , conn, chunksize=10000):
with open(os.path.join(download_location, file_name+ '.parquet'),mode="a+") as f:
write(filename=f.name,data=chunk,append=True)
Getting following error:
raise ValueError("Can't infer object conversion type: %s" % head)
ValueError: Can't infer object conversion type: 0 2021-09-06
Is there any solution which converts pandas dataframe to parquet file (append mode) without datetime columns issue?
Upvotes: 0
Views: 3929
Reputation: 45
The solution is to convert the pandas dataframe chunk into str and write to parquet file
query = f'select * from `{table["table_name"]}`'
for i,chunk in enumerate(pd.read_sql_query(query , conn, chunksize=10000)):
all_columns = list(chunk) # Creates list of all column headers
chunk[all_columns] = chunk[all_columns].astype(str)
#convert data to string
table_schema =chunk.dtypes.astype(str).to_dict()
for k,v in table_schema.items():
if v == 'object':
table_schema[k]=pa.string()
#create pyarrow schema for string format
fields = [pa.field(x, y) for x, y in table_schema.items()]
new_schema = pa.schema(fields)
if i == 0:
pqwriter = pq.ParquetWriter(where=updated_path,schema=new_schema, compression='snappy')
table = pa.Table.from_pandas(df=chunk, schema=new_schema,preserve_index=False,safe=False)
pqwriter.write_table(table)
Upvotes: 0