Reputation: 61
I am trying to implement a COPY statement to push a pandas dataframe over to a CloudSQL Postgres database in an Airflow DAG.
I have the one limitation: I can only use the pg8000 driver.
I am using this as a reference https://github.com/tlocke/pg8000#copy-from-and-to-a-file (which I found in this thread https://news.ycombinator.com/item?id=25402430)
Here is my code
def getconn() -> pg8000.native.Connection:
conn: pg8000.native.Connection = connector.connect(
PG_CONFIG["host"],
"pg8000",
user=PG_CONFIG["user"],
password=PG_CONFIG["password"],
db=PG_CONFIG["database"]
)
return conn
engine = sqlalchemy.create_engine("postgresql+pg8000://",creator=getconn)
engine.dialect.description_encoding = None
stream_in = StringIO()
csv_writer = csv.writer(stream_in)
csv_writer.writerow([1, "electron"])
csv_writer.writerow([2, "muon"])
csv_writer.writerow([3, "tau"])
stream_in.seek(0)
conn = engine.connect()
conn.execute("CREATE TABLE IF NOT EXISTS temp_table (user_id numeric, user_name text)")
conn.execute("COPY temp_table FROM STDIN WITH (FORMAT CSV)", stream=stream_in)
I have tried everything I can think of (using DELIMITER
option, passing text instead of csv...) but I keep getting this error
Could not determine data type of parameter $1
[2021-10-08 23:24:06,440] {taskinstance.py:1152} ERROR - (pg8000.exceptions.DatabaseError) {'S': 'ERROR', 'V': 'ERROR', 'C': '42P18', 'M': 'could not determine data type of parameter $1', 'F': 'postgres.c', 'L': '1363', 'R': 'exec_parse_message'}
[SQL: COPY winappsx.aa FROM STDIN WITH (FORMAT CSV)]
[parameters: {'stream': <_io.StringIO object at 0x7f86a58d7dc8>}]
(Background on this error at: http://sqlalche.me/e/13/4xp6)
Traceback (most recent call last):
File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
File "/opt/python3.6/lib/python3.6/site-packages/pg8000/dbapi.py", line 454, in execute
statement, vals=vals, input_oids=self._input_oids, stream=stream
File "/opt/python3.6/lib/python3.6/site-packages/pg8000/core.py", line 632, in execute_unnamed
self.handle_messages(context)
File "/opt/python3.6/lib/python3.6/site-packages/pg8000/core.py", line 769, in handle_messages
raise self.error
pg8000.exceptions.DatabaseError: {'S': 'ERROR', 'V': 'ERROR', 'C': '42P18', 'M': 'could not determine data type of parameter $1', 'F': 'postgres.c', 'L': '1363', 'R': 'exec_parse_message'}
I know the connection works because the table gets created properly. The error occurs on the COPY statement.
I suspect there is an issue in the way the stream parameter is provided but cannot find the correct syntax. This may help https://www.kite.com/python/docs/pg8000.Cursor.execute
Thank you for your help!
Upvotes: 0
Views: 1485
Reputation: 61
A friend found the answer ;-)
Instead of a normal SQLAlchemy connection, we make one that uses the pg8000 APIs. This is from https://docs.sqlalchemy.org/en/13/core/connections.html#working-with-raw-dbapi-connections
Now that we have the pg8000 connection, I looked at this section of the pg8000 examples: https://github.com/tlocke/pg8000#copy-from-and-to-a-file-1 Make a cursor from the pg8000-conn, and then user the cursor.execute function. This connPG8K.cursor.execute() on line 120 uses pg8000 and will then be able to use the stream input in the function. The sqlAlchemy conn.execute didn't have a stream input option and was probably failing for this reason.
Here is the code:
stream_in = StringIO()
csv_writer = csv.writer(stream_in)
csv_writer.writerow([1, "electron"])
csv_writer.writerow([2, "muon"])
csv_writer.writerow([3, "tau"])
csv_writer.writerow([4, "sean is the best"])
stream_in.seek(0)
# Creates a connection with sqlalchemy methods
conn = engine.connect()
# Get the connection from pg8000 library
connPG8K = engine.raw_connection()
# Get cursor from pg8000 to be able to run commands
cursor = connPG8K.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS temp_table (user_id numeric, user_name text)")
cursor.execute("COPY temp_table FROM STDIN WITH (FORMAT csv, DELIMITER)", stream=stream_in)
connPG8K.commit()
Upvotes: 1