Reputation: 600
I am trying to create a class that spins up a test db from sample JSON data using testing.postgresql. I want to call in 5 JSON files and create 5 tables that I can use for unit-testing operators and queries in airflow.
code so far is
import json
import decimal, datetime
import testing.postgresql
import psycopg2
class PostgresMock:
with testing.postgresql.Postgresql() as postgresql:
#engine = create_engine(postgresql.url())
db = psycopg2.connect(**postgresql.dsn())
#load Json file with dummy data
def load_data(self):
with open(self) as f:
data=json.load(f)
return data
#build create statement from sql file
def create_sql(self):
# Find all keys
keys = []
for row in data:
for key in row.keys():
if key not in keys:
keys.append(key)
create_statement=("""CREATE TABLE MY_TABLE({0});""".format(",\n ".join(map(lambda key: "{0} VARCHAR".format(key), keys))))
return create_statement
#build insert statement
def sql_insert(data):
for row in data:
print ("""INSERT INTO MY_TABLE VALUES({0});""".format(
",".join(map(lambda key: "'{0}'".format(row[key]) if key in row else "NULL", keys))))
#create handler for test db and insert
def handler(postgresql):
conn = psycopg2.connect(**postgresql.dsn())
cursor = conn.cursor()
cursor.execute(create_sql(file))
for statement in sql_insert(data):
try:
cursor.execute(f'{statement}')
conn.commit()
except psycopg2.Error as errorMsg:
print(errorMsg)
conn.rollback()
cursor.close()
conn.commit()
conn.close()
# Use `handler()` on initialize database
PostgresqlMock = testing.postgresql.PostgresqlFactory(cache_initialized_db=True,
on_initialized=handler)
Getting the error
name 'create_sql' is not defined
Upvotes: 0
Views: 223
Reputation: 76
You're getting the error name 'create_sql' is not defined
because you're referencing the function inside the object without using the self argument.
If you want to reference an object function within another object function you can do so, by adding self
to the function call as well as the function in which you are calling it.
Like so:
def handler(self, postgresql): # changed a line here
conn = psycopg2.connect(**postgresql.dsn())
cursor = conn.cursor()
cursor.execute(self.create_sql(file)) # changed a line here
for statement in sql_insert(data):
try:
cursor.execute(f'{statement}')
conn.commit()
except psycopg2.Error as errorMsg:
print(errorMsg)
conn.rollback()
cursor.close()
conn.commit()
conn.close()
Upvotes: 1