Reputation:
First, the Python code should connect to an Oracle database and store the retrieved data using the pandas library. Then, a connection to the relevant PostgreSQL database will be established. A table with the required attributes will be created. This part works correctly.
However, the final step, which involves populating the table with values, doesn't work. The reason for this is that the code is not able to convert the sdo_geometry to the wkb/wkb- forma, which is needed to store in postgres
import pandas as pd
import psycopg2
from psycopg2 import sql
from psycopg2 import OperationalError, ProgrammingError, IntegrityError
oracledb.init_oracle_client(lib_dir=r"E:\Oracle\product\19.3.0\client_x64\bin")
# Verbindung zur Oracle-Datenbank herstellen
try:
oracle_conn = oracledb.connect(user='', password='', host='ws2', port=, service_name='pgis.stadt.winroot.net')
print("Oracle connection established.")
except oracledb.DatabaseError as e:
print(f"Error connecting to Oracle: {e}")
raise
# Abfrage der Tabellenstruktur
schema_name = "WT_RP"
table_name = "W_RP_V_DSP_RP_REGIONAL_L"
query = f"""
SELECT column_name, data_type
FROM all_tab_columns
WHERE table_name = '{table_name.upper()}'
AND owner = '{schema_name.upper()}'
"""
try:
columns_df = pd.read_sql(query, con=oracle_conn)
if columns_df.empty:
print("No columns found for the specified table.")
else:
print("Column information retrieved from Oracle.")
except Exception as e:
print(f"Error retrieving column information from Oracle: {e}")
oracle_conn.close()
raise
# Überprüfen und Bereinigen der Spaltennamen
print("Original column names:", columns_df.columns)
columns_df.columns = columns_df.columns.str.strip().str.lower()
print("Cleaned column names:", columns_df.columns)
# Abfrage der Daten
data_query = f"SELECT * FROM {table_name}"
try:
data_df = pd.read_sql(data_query, con=oracle_conn)
print("Data retrieved from Oracle.")
except Exception as e:
print(f"Error retrieving data from Oracle: {e}")
oracle_conn.close()
raise
finally:
oracle_conn.close()
print("Oracle connection closed.")
# Mapping von Oracle-Datentypen zu PostgreSQL-Datentypen
def map_datatypes(oracle_type):
mapping = {
"varchar2": "VARCHAR",
"nvarchar2": "VARCHAR",
"char": "CHAR",
"nchar": "CHAR",
"clob": "TEXT",
"nclob": "TEXT",
"blob": "BYTEA",
"number": "NUMERIC",
"float": "FLOAT",
"binary_float": "REAL",
"binary_double": "DOUBLE PRECISION",
"date": "DATE",
"timestamp": "TIMESTAMP",
"timestamp with time zone": "TIMESTAMP WITH TIME ZONE",
"timestamp with local time zone": "TIMESTAMP",
"interval year to month": "INTERVAL YEAR TO MONTH",
"interval day to second": "INTERVAL DAY TO SECOND",
"raw": "BYTEA",
"long": "TEXT",
"long raw": "BYTEA",
"rowid": "VARCHAR",
"urowid": "VARCHAR",
"xmltype": "XML",
"sdo_geometry": "GEOMETRY"
}
oracle_type_base = oracle_type.split('(')[0].lower()
return mapping.get(oracle_type_base, "TEXT")
columns_df['postgres_type'] = columns_df['data_type'].apply(map_datatypes)
# Verbindung zur PostgreSQL-Datenbank herstellen
try:
pg_conn = psycopg2.connect(
host="w",
database="ge",
user="b",
password=""
)
pg_cursor = pg_conn.cursor()
print("PostgreSQL connection established.")
except OperationalError as e:
print(f"Error connecting to PostgreSQL: {e}")
raise
#bis hier hin funktioniert alles
try:
# PostGIS-Erweiterung aktivieren
pg_cursor.execute("CREATE EXTENSION IF NOT EXISTS postgis;")
pg_conn.commit()
print("PostGIS extension activated.")
# Erstellen der Tabelle in PostgreSQL
create_table_query = sql.SQL("""
CREATE TABLE IF NOT EXISTS {table} (
{fields}
)
""").format(
table=sql.Identifier("test_tabelle3"),
fields=sql.SQL(', ').join(
sql.SQL("{} {}").format(sql.Identifier(row['column_name']), sql.SQL(row['postgres_type']))
for idx, row in columns_df.iterrows()
)
)
pg_cursor.execute(create_table_query)
pg_conn.commit()
print("Table created in PostgreSQL.")
except Exception as e:
print(f"Error during table creation: {e}")
raise
try:
# Erstellen des Insert-Statements
insert_query = sql.SQL("""
INSERT INTO test_tabelle3 ({}) #immer ändern, wenn der name geändert wird
VALUES ({})
""").format(
sql.SQL(', ').join(map(sql.Identifier, data_df.columns)),
sql.SQL(', ').join(sql.Placeholder() * len(data_df.columns))
)
# Einfügen der Daten
for row in data_df.itertuples(index=False, name=None):
values = []
for val, col in zip(row, data_df.columns):
if col == 'GEOMETRY': # assuming 'geom' is the geometry column
if val is not None:
geom = wkt.loads(val) if isinstance(val, str) else wkb.loads(val.read())
values.append(geom.wkt)
else:
values.append(None)
else:
values.append(val)
pg_cursor.execute(insert_query, values)
pg_conn.commit()
print("Data inserted into PostgreSQL.")
except (ProgrammingError, IntegrityError) as e:
print(f"Error during table creation or data insertion: {e}")
pg_conn.rollback()
raise
finally:
pg_cursor.close()
pg_conn.close()
print("PostgreSQL connection closed.")
i get the following messages when i run the code:
E:\Batch_Skripts\Richtplan\test_main.py:47: UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy. data_df = pd.read_sql(data_query, con=oracle_conn) Data retrieved from Oracle. Oracle connection closed. PostgreSQL connection established. PostGIS extension activated. Table created in PostgreSQL. Error during table creation or data insertion: can't adapt type 'DbObject' PostgreSQL connection closed. Traceback (most recent call last): File "E:\Batch_Skripts\Richtplan\test_main.py", line 161, in pg_cursor.execute(insert_query, values) psycopg2.ProgrammingError: can't adapt type 'DbObject'
Upvotes: 0
Views: 75