Reputation: 1
I am trying to concurrently process insert/update into a redshift database using a python script on AWS glue. I am using the pg8000 library to do all my database operations. The concurrent insert/update fails with an error Error Name:1023 ,Error State:XX000)
. While researching the error I found out that the error was related to Serializable Isolation.
Can anyone look at the code and ensure that there would not be clashes while the insert/update happens?
I tried using a random sleep time within the calling class. it worked for a couple of cases but then as the number of workers increased. It failed for an insert/update case.
import sys
import time
import concurrent.futures
import pg8000
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME','REDSHIFT_HOST','REDSHIFT_PORT','REDSHIFT_DB','REDSHIFT_USER_NAME','REDSHIFT_USER_PASSWORD'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
job_run_id = args['JOB_RUN_ID']
maximum_workers = 5
def executeSql(sqlStmt):
conn = pg8000.connect(database=args['REDSHIFT_DB'],user=args['REDSHIFT_USER_NAME'],password=args['REDSHIFT_USER_PASSWORD'],host=args['REDSHIFT_HOST'],port=int(args['REDSHIFT_PORT']))
conn.autocommit = True
cur = conn.cursor()
cur.execute(sqlStmt)
cur.close()
conn.close()
def executeSqlProcedure(procedureName, procedureArgs = ""):
try:
logProcStrFormat = "CALL table_insert_proc('{}','{}','{}','{}',{},{})"
#Insert into the log table - create the record
executeSql (logProcStrFormat.format(job_run_id,procedureName,'pending','','getdate()','null')) #Code fails here
#Executing the procedure
procStrFormat = "CALL {}({})"
executeSql(procStrFormat.format(procedureName,procedureArgs))
print("Printing from {} process at ".format(procedureName),time.ctime())
#Update the record in log table to complete
executeSql (logProcStrFormat.format(job_run_id,procedureName,'complete','','null','getdate()')) #Code fails here
except Exception as e:
errorMsg = str(e.message["M"])
executeSql (logProcStrFormat.format(job_run_id,procedureName,'failure',errorMsg,'null','getdate()'))
raise
sys.exit(1)
def runDims():
dimProcedures = ["test_proc1","test_proc2","test_proc3","test_proc4","test_proc5"]
with concurrent.futures.ThreadPoolExecutor(max_workers=maximum_workers) as executor:
result = list(executor.map(executeSqlProcedure, dimProcedures))
def runFacts():
factProcedures = ["test_proc6","test_proc7","test_proc8","test_proc9"]
with concurrent.futures.ThreadPoolExecutor(max_workers=maximum_workers) as executor:
result = list(executor.map(executeSqlProcedure, factProcedures))
runDims()
runFacts()
I expect the insert/update to occur into the log table without locking/erroring out
Upvotes: 0
Views: 2095
Reputation: 269370
Amazon Redshift does not work well with lots of small INSERT
statements.
From Use a Multi-Row Insert - Amazon Redshift:
If a COPY command is not an option and you require SQL inserts, use a multi-row insert whenever possible. Data compression is inefficient when you add data only one row or a few rows at a time.
Multi-row inserts improve performance by batching up a series of inserts. The following example inserts three rows into a four-column table using a single INSERT statement. This is still a small insert, shown simply to illustrate the syntax of a multi-row insert.
insert into category_stage values
(default, default, default, default),
(20, default, 'Country', default),
(21, 'Concerts', 'Rock', default);
Alternatively, output the data to Amazon S3, then perform a bulk load using the COPY
command. This will be much more efficient because it can perform the load in parallel across all nodes.
Upvotes: 1