Kaden Young
Kaden Young

Reputation: 11

How to convert sqlalchemy datatypes to pyarrow dtype? or sql server datatypes to pyarrow?

I am working on writing a daily load from sql server to a parquet file (while also leaving options to export to csv or json). I am using pyodbc, sqlalchemy, pandas and pyarrow to do this.

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import sys
import argparse
import sqlalchemy as sa
import os
...
connection_string = f'mssql+pyodbc://{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server&trusted_connection=yes&read_only=true'
engine = sa.create_engine(connection_string)
...
sql = sa.text(f"""
    SELECT MAIN.*
        , etl_dest.ETL_PROCESS_EXECUTION_IDENTIFIER as [ETL_EVENT_PROCESS_EXECUTION_IDENTIFIER]
    FROM {database}.{schema}.{table} MAIN
    INNER JOIN {database}.audit.ETL_EVENT_DESTINATION etl_dest on MAIN.ETL_EVENT_DESTINATION_IDENTIFIER = etl_dest.ETL_EVENT_DESTINATION_IDENTIFIER  
    INNER JOIN {database}.Audit.ETL_PROCESS_EXECUTION etl_proc on etl_dest.ETL_PROCESS_EXECUTION_IDENTIFIER = etl_proc.ETL_PROCESS_EXECUTION_IDENTIFIER  
    INNER JOIN {database}.Audit.ETL_BATCH_EXECUTION etl_batch on etl_proc.ETL_BATCH_EXECUTION_IDENTIFIER = etl_batch.ETL_BATCH_EXECUTION_IDENTIFIER  
    WHERE
        etl_proc.ETL_BATCH_EXECUTION_IDENTIFIER = :batch_identifier
    """)

Retrieve the table schema using SQLAlchemy

# inspector = sa.inspect(engine)
# columns_info = inspector.get_columns(table, schema=schema)
# columns_info.append({'name': 'RWB_ETL_EVENT_PROCESS_EXECUTION_IDENTIFIER', 'type': sa.INTEGER(), 'nullable': False, 'default': None, 'autoincrement': False, 'comment': None})

# Establish connection to the main database
with engine.connect().execution_options(stream_results=True) as connection:
    # Execute the query and fetch results in chunks
    # TODO: if someone implements a sqlalchemy to pyarrow types conversion, likely can increase performace here by specifiying the dtype=
    chunks = pd.read_sql(sql, connection, params={"batch_identifier": batch_identifier}, chunksize=10000, dtype_backend='pyarrow')
    # Write the chunks to the output file
    if file_format == 'parquet':
        writer = None
        for chunk in chunks:
            batch = pa.RecordBatch.from_pandas(chunk)
            if writer is None:
                writer = pq.ParquetWriter(output_file, batch.schema)
            writer.write_batch(batch)
        writer.close()
    elif file_format == 'csv':
        header = True
        for chunk in chunks:
            chunk.to_csv(output_file, index=False, sep=delimiter, mode='a', header=header)
            header = False
    elif file_format == 'json':
        for chunk in chunks:
            chunk.to_json(output_file, orient='records', lines=True, force_ascii=False, mode='a')
    else:
        raise ValueError(f"Unsupported file format: {file_format}")

Basically, my problem boils down to I want to get a table definition out of SQL server in pyarrow datatypes. A secondary option would be in numpy data types. I would use this to optimize the pandas read out of SQL server and hold the types constant between chunks.

If the dtype_backend is not set to pyarrow I can get a file and table schema mismatch when writing to parquet.

Upvotes: 1

Views: 106

Answers (0)

Related Questions