user_v27
user_v27

Reputation: 423

Dataframe to PostgreSQL DB

I query 4hrs data from source PLC MS SQL db, process it with python and write the data to main Postgresql table.

While writing to main Postgres table hourly, there is a duplicate value (previous 3 hrs) -it will result in error (primary key) and prevent the transaction & python error.

So,

  1. I create a temp PostgreSQL table without any key every time hourly
  2. Then copy pandas dataframe to temp table
  3. Then insert rows from temp table --> main PostgreSQL table
  4. Drop temp PostgreSQL table

This python script runs in windows task scheduler hourly

Below is my query.

engine = create_engine('postgresql://postgres:postgres@host:port/dbname?gssencmode=disable')
conn = engine.raw_connection()
cur = conn.cursor()

cur.execute("""CREATE TABLE public.table_temp
(
    datetime timestamp without time zone NOT NULL,
    tagid text COLLATE pg_catalog."default" NOT NULL,
    mc text COLLATE pg_catalog."default" NOT NULL,
    value text COLLATE pg_catalog."default",
    quality text COLLATE pg_catalog."default"
)

TABLESPACE pg_default;

ALTER TABLE public.table_temp
    OWNER to postgres;""");

output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_temp', null="")
cur.execute("""Insert into public.table_main select * From table_temp ON CONFLICT DO NOTHING;""");
cur.execute("""DROP TABLE table_temp CASCADE;""");
conn.commit()

I would like to know if there is any efficient/faster way to do it

Upvotes: 2

Views: 7125

Answers (1)

phil
phil

Reputation: 423

If I'm correct in assuming that the data is in the data frame you should just be able to do

engine = create_engine('postgresql://postgres:postgres@host:port/dbname?gssencmode=disable')
df.drop_duplicates(subset=None)  # Replace None with list of column names that define the primary key ex. ['column_name1', 'column_name2']
df.to_sql('table_main', engine, if_exists='append')

Edit due to comment:
If that's the case you have the right idea. You can make it more efficient by using to_sql to insert the data into the temp table first like so.

engine = create_engine('postgresql://postgres:postgres@host:port/dbname?gssencmode=disable')
df.to_sql('table_temp', engine, if_exists='replace')
cur.execute("""Insert into public.table_main select * From table_temp ON CONFLICT DO NOTHING;""");
# cur.execute("""DROP TABLE table_temp CASCADE;""");  # You can drop if you want to but the replace option in to_sql will drop and recreate the table
conn.commit()

Upvotes: 3

Related Questions