Reputation: 344
I am trying to collect the results from a postgres/redshift query into a dictionary using the column and its value.
So if the results from my select * from ___ statement is :
field1 | field2 | field3
value1 | value2 | value3
How to do put the results in a dictionary: field1:value1, field2,value2 , etc.....
Here is my Airflow Script:
#Import Modules
from datetime import datetime, timedelta
from airflow import DAG
from paramiko.config import SSH_PORT
from airflow.hooks.base_hook import BaseHook
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from sshtunnel import SSHTunnelForwarder, create_logger
from io import StringIO
import logging
from distutils.util import execute
from contextlib import closing
import paramiko
import MySQLdb as sql
from contextlib import closing
import psycopg2
import psycopg2.extensions
from psycopg2.extras import RealDictCursor
import psycopg2.extras
import operator
import itertools
from query_tools import fetch, execute
def get_etl():
pg_hook = PostgresHook(postgre_conn_id="postgres_default", schema='schema1')
connection = pg_hook.get_conn()
col_query = "select * from schema.table"
cursor = connection.cursor()
cursor.execute(col_query)
ff = cursor.fetchall()
connection.commit()
connection.close()
# Identify Deafult Arguments
default_args = {
'owner': 'm',
'depends_on_past': False,
'start_date': datetime(2019,12,15),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
#Instantiate DAG instance
with DAG('try_me', description ='This ist by Maliva ', default_args = default_args, schedule_interval ='@hourly', catchup = False) as dag:
t1 = PythonOperator(task_id ='new_one', python_callable = get_etl )
When running this script, it outputs the results but it only outputs the records: value1, value2, value3.
ANY ideas or suggestions to get the column name associated with the value?
Upvotes: 3
Views: 2736
Reputation: 2094
You can use itertools for that :
import itertools
pg_hook = PostgresHook(postgre_conn_id="postgres_default", schema='schema1')
connection = pg_hook.get_conn()
col_query = "select * from schema.table"
cursor = connection.cursor()
cursor.execute(col_query)
#fetchall to dictonary
desc = cursor.description
column_names = [col[0] for col in desc]
data = [dict(zip(column_names, row)) for row in cursor.fetchall()]
print(data)
#ff = cursor.fetchall()
connection.commit()
connection.close()
Upvotes: 3