Reputation: 440
I'm working on automating some query extraction using python and pyodbc, and then converting to parquet format, and send to AWS S3.
My script solution is working fine so far, but I have faced a problem. I have a Schema, let us call it SCHEMA_A, and inside of it several tables, TABLE_1, TABLE_2 .... TABLE_N.
All those tables inside that schema are accessible by using the same credentials.
So I'm using a script like this one to automate the task.
def get_stream(cursor, batch_size=100000):
while True:
row = cursor.fetchmany(batch_size)
if row is None or not row:
break
yield row
cnxn = pyodbc.connect(driver='pyodbc driver here',
host='host name',
database='schema name',
user='user name,
password='password')
print('Connection stabilished ...')
cursor = cnxn.cursor()
print('Initializing cursos ...')
if len(sys.argv) > 1:
table_name = sys.argv[1]
cursor.execute('SELECT * FROM {}'.format(table_name))
else:
exit()
print('Query fetched ...')
row_batch = get_stream(cursor)
print('Getting Iterator ...')
cols = cursor.description
cols = [col[0] for col in cols]
print('Initalizin batch data frame ..')
df = pd.DataFrame(columns=cols)
start_time = time.time()
for rows in row_batch:
tmp = pd.DataFrame.from_records(rows, columns=cols)
df = df.append(tmp, ignore_index=True)
tmp = None
print("--- Batch inserted inn%s seconds ---" % (time.time() - start_time))
start_time = time.time()
I run a code similar to that inside Airflow tasks, and works just fine for all other tables. But then I have two tables, lets call TABLE_I and TABLE_II that yields the following error when I execute cursor.fetchmany(batch_size)
:
ERROR - ('ODBC SQL type -151 is not yet supported. column-index=16 type=-151', 'HY106')
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1310, in _execute_task
result = task_copy.execute(context=context)
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 128, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/ubuntu/prea-ninja-airflow/jobs/plugins/extract/fetch.py", line 58, in fetch_data
for rows in row_batch:
File "/home/ubuntu/prea-ninja-airflow/jobs/plugins/extract/fetch.py", line 27, in stream
row = cursor.fetchmany(batch_size)
Inspecting those tables with SQLElectron, and Querying the first few lines, I have realized that both TABLE_I and TABLE_II have a Column called 'Geolocalizacao', when I use SQL server language to find the DATA TYPE of that column with:
SELECT DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = 'TABLE_I' AND
COLUMN_NAME = 'Geolocalizacao';
It yields:
DATA_TYPE
geography
Seraching here on stack overflow I found this solution: python pyodbc SQL Server Native Client 11.0 cannot return geometry column
By the description of the user, it seem work fine by adding:
def unpack_geometry(raw_bytes):
# adapted from SSCLRT information at
# https://learn.microsoft.com/en-us/openspecs/sql_server_protocols/ms-ssclrt/dc988cb6-4812-4ec6-91cd-cce329f6ecda
tup = struct.unpack('<i2b3d', raw_bytes)
# tup contains: (unknown, Version, Serialization_Properties, X, Y, SRID)
return tup[3], tup[4], tup[5]
and then:
cnxn.add_output_converter(-151, unpack_geometry)
After creating the connection. But It's not working for the GEOGRAPHY DATA TYPE, when I use this code (add import struct
on python script), it gives me the following error:
Traceback (most recent call last):
File "benchmark.py", line 79, in <module>
for rows in row_batch:
File "benchmark.py", line 39, in get_stream
row = cursor.fetchmany(batch_size)
File "benchmark.py", line 47, in unpack_geometry
tup = struct.unpack('<i2b3d', raw_bytes)
struct.error: unpack requires a buffer of 30 bytes
An example of values that this column have, follows the given template:
{"srid":4326,"version":1,"points":[{}],"figures":[{"attribute":1,"pointOffset":0}],"shapes":[{"parentOffset":-1,"figureOffset":0,"type":1}],"segments":[]}
I honestly don't know how to adapt the code for this given structure, can someone help me? It's been working fine for all other tables, but I have those two tables with this column that are giving me a lot o headeach.
Upvotes: 2
Views: 2711
Reputation: 21
Hi this is what I have done:
from binascii import hexlify
def _handle_geometry(geometry_value):
return f"0x{hexlify(geometry_value).decode().upper()}"
and then on connection:
cnxn.add_output_converter(-151, _handle_geometry)
this will return value as SSMS.
Upvotes: 2