Coder123
Coder123

Reputation: 344

How to Put a Select * Postgres/Redshift Query results into a dictionary(column/value)

I am trying to collect the results from a postgres/redshift query into a dictionary using the column and its value.

So if the results from my select * from ___ statement is :

field1 | field2 | field3

value1 | value2 | value3

How to do put the results in a dictionary: field1:value1, field2,value2 , etc.....

Here is my Airflow Script:

#Import Modules

from datetime import datetime, timedelta
from airflow import DAG
from paramiko.config import SSH_PORT
from airflow.hooks.base_hook import BaseHook
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from sshtunnel import SSHTunnelForwarder, create_logger
from io import StringIO
import logging
from distutils.util import execute
from contextlib import closing
import paramiko
import MySQLdb as sql
from contextlib import closing
import psycopg2
import psycopg2.extensions
from psycopg2.extras import RealDictCursor
import psycopg2.extras
import operator
import itertools
from query_tools import fetch, execute




def get_etl():
    pg_hook = PostgresHook(postgre_conn_id="postgres_default", schema='schema1')
    connection = pg_hook.get_conn()
    col_query = "select * from schema.table"
    cursor = connection.cursor()
    cursor.execute(col_query)
    ff = cursor.fetchall()
    connection.commit()
    connection.close()


# Identify Deafult Arguments

default_args = {
    'owner': 'm',
    'depends_on_past': False,
    'start_date': datetime(2019,12,15),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}


#Instantiate DAG instance

with DAG('try_me', description ='This ist by Maliva ', default_args = default_args, schedule_interval ='@hourly', catchup = False) as dag:
    t1 = PythonOperator(task_id ='new_one', python_callable = get_etl )

When running this script, it outputs the results but it only outputs the records: value1, value2, value3.

ANY ideas or suggestions to get the column name associated with the value?

Upvotes: 3

Views: 2736

Answers (1)

GiovaniSalazar
GiovaniSalazar

Reputation: 2094

You can use itertools for that :

import itertools

pg_hook = PostgresHook(postgre_conn_id="postgres_default", schema='schema1')
connection = pg_hook.get_conn()
col_query = "select * from schema.table"
cursor = connection.cursor()
cursor.execute(col_query)
#fetchall to dictonary
desc = cursor.description
column_names = [col[0] for col in desc]
data = [dict(zip(column_names, row)) for row in cursor.fetchall()]
print(data)

#ff = cursor.fetchall()
connection.commit()
connection.close()

Upvotes: 3

Related Questions