Pj-
Pj-

Reputation: 440

pyodbc ERROR - ('ODBC SQL type -151 is not yet supported. column-index=16 type=-151', 'HY106')

I'm working on automating some query extraction using python and pyodbc, and then converting to parquet format, and send to AWS S3.

My script solution is working fine so far, but I have faced a problem. I have a Schema, let us call it SCHEMA_A, and inside of it several tables, TABLE_1, TABLE_2 .... TABLE_N.

All those tables inside that schema are accessible by using the same credentials.

So I'm using a script like this one to automate the task.

def get_stream(cursor, batch_size=100000):
    while True:
        row = cursor.fetchmany(batch_size)
        if row is None or not row:
            break
        yield row


cnxn = pyodbc.connect(driver='pyodbc driver here',
                      host='host name',
                      database='schema name',
                      user='user name,
                      password='password')
print('Connection stabilished ...')

cursor = cnxn.cursor()
print('Initializing cursos ...')

if len(sys.argv) > 1:
    table_name = sys.argv[1]
    cursor.execute('SELECT * FROM {}'.format(table_name))
else:
    exit()
print('Query fetched ...')

row_batch = get_stream(cursor)
print('Getting Iterator ...')

cols = cursor.description
cols = [col[0] for col in cols]

print('Initalizin batch data frame ..')
df = pd.DataFrame(columns=cols)

start_time = time.time()
for rows in row_batch:
    tmp = pd.DataFrame.from_records(rows, columns=cols)
    df = df.append(tmp, ignore_index=True)
    tmp = None
    print("--- Batch inserted inn%s seconds ---" % (time.time() - start_time))
    start_time = time.time()

I run a code similar to that inside Airflow tasks, and works just fine for all other tables. But then I have two tables, lets call TABLE_I and TABLE_II that yields the following error when I execute cursor.fetchmany(batch_size):

ERROR - ('ODBC SQL type -151 is not yet supported.  column-index=16  type=-151', 'HY106')
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1310, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/ubuntu/prea-ninja-airflow/jobs/plugins/extract/fetch.py", line 58, in fetch_data
    for rows in row_batch:
  File "/home/ubuntu/prea-ninja-airflow/jobs/plugins/extract/fetch.py", line 27, in stream
    row = cursor.fetchmany(batch_size)

Inspecting those tables with SQLElectron, and Querying the first few lines, I have realized that both TABLE_I and TABLE_II have a Column called 'Geolocalizacao', when I use SQL server language to find the DATA TYPE of that column with:

SELECT DATA_TYPE 
FROM INFORMATION_SCHEMA.COLUMNS
WHERE 
     TABLE_NAME = 'TABLE_I' AND 
     COLUMN_NAME = 'Geolocalizacao';

It yields:

DATA_TYPE
geography

Seraching here on stack overflow I found this solution: python pyodbc SQL Server Native Client 11.0 cannot return geometry column

By the description of the user, it seem work fine by adding:

def unpack_geometry(raw_bytes):
    # adapted from SSCLRT information at
    #   https://learn.microsoft.com/en-us/openspecs/sql_server_protocols/ms-ssclrt/dc988cb6-4812-4ec6-91cd-cce329f6ecda
    tup = struct.unpack('<i2b3d', raw_bytes)
    # tup contains: (unknown, Version, Serialization_Properties, X, Y, SRID)
    return tup[3], tup[4], tup[5]

and then:

cnxn.add_output_converter(-151, unpack_geometry)

After creating the connection. But It's not working for the GEOGRAPHY DATA TYPE, when I use this code (add import struct on python script), it gives me the following error:

Traceback (most recent call last):
  File "benchmark.py", line 79, in <module>
    for rows in row_batch:
  File "benchmark.py", line 39, in get_stream
    row = cursor.fetchmany(batch_size)
  File "benchmark.py", line 47, in unpack_geometry
    tup = struct.unpack('<i2b3d', raw_bytes)
struct.error: unpack requires a buffer of 30 bytes

An example of values that this column have, follows the given template:

{"srid":4326,"version":1,"points":[{}],"figures":[{"attribute":1,"pointOffset":0}],"shapes":[{"parentOffset":-1,"figureOffset":0,"type":1}],"segments":[]}

I honestly don't know how to adapt the code for this given structure, can someone help me? It's been working fine for all other tables, but I have those two tables with this column that are giving me a lot o headeach.

Upvotes: 2

Views: 2711

Answers (1)

Benks
Benks

Reputation: 21

Hi this is what I have done:

from binascii import hexlify

def _handle_geometry(geometry_value):
    return f"0x{hexlify(geometry_value).decode().upper()}"

and then on connection:

cnxn.add_output_converter(-151, _handle_geometry)

this will return value as SSMS.

Upvotes: 2

Related Questions