Anantha Bolar
Anantha Bolar

Reputation: 1

Issues with concurrent inserts on Redshift table

I am trying to concurrently process insert/update into a redshift database using a python script on AWS glue. I am using the pg8000 library to do all my database operations. The concurrent insert/update fails with an error Error Name:1023 ,Error State:XX000). While researching the error I found out that the error was related to Serializable Isolation.

Can anyone look at the code and ensure that there would not be clashes while the insert/update happens?

I tried using a random sleep time within the calling class. it worked for a couple of cases but then as the number of workers increased. It failed for an insert/update case.

    import sys
    import time
    import concurrent.futures
    import pg8000
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job

    args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME','REDSHIFT_HOST','REDSHIFT_PORT','REDSHIFT_DB','REDSHIFT_USER_NAME','REDSHIFT_USER_PASSWORD'])

    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    job_run_id = args['JOB_RUN_ID']
    maximum_workers = 5

    def executeSql(sqlStmt):
      conn = pg8000.connect(database=args['REDSHIFT_DB'],user=args['REDSHIFT_USER_NAME'],password=args['REDSHIFT_USER_PASSWORD'],host=args['REDSHIFT_HOST'],port=int(args['REDSHIFT_PORT']))
      conn.autocommit = True
      cur = conn.cursor()
      cur.execute(sqlStmt)
      cur.close()
      conn.close()


    def executeSqlProcedure(procedureName, procedureArgs = ""):
        try:
            logProcStrFormat  = "CALL table_insert_proc('{}','{}','{}','{}',{},{})"
            #Insert into the log table - create the record
            executeSql (logProcStrFormat.format(job_run_id,procedureName,'pending','','getdate()','null')) #Code fails here
            #Executing the procedure
            procStrFormat = "CALL {}({})"
            executeSql(procStrFormat.format(procedureName,procedureArgs))
            print("Printing from {} process at ".format(procedureName),time.ctime())
            #Update the record in log table to complete
            executeSql (logProcStrFormat.format(job_run_id,procedureName,'complete','','null','getdate()')) #Code fails here
        except Exception as e:
            errorMsg = str(e.message["M"])
            executeSql (logProcStrFormat.format(job_run_id,procedureName,'failure',errorMsg,'null','getdate()'))
            raise 
            sys.exit(1)


    def runDims():
      dimProcedures = ["test_proc1","test_proc2","test_proc3","test_proc4","test_proc5"]

      with concurrent.futures.ThreadPoolExecutor(max_workers=maximum_workers) as executor:
        result = list(executor.map(executeSqlProcedure, dimProcedures))


    def runFacts():
      factProcedures = ["test_proc6","test_proc7","test_proc8","test_proc9"]

      with concurrent.futures.ThreadPoolExecutor(max_workers=maximum_workers) as executor:
        result = list(executor.map(executeSqlProcedure, factProcedures))    


    runDims()
    runFacts()

I expect the insert/update to occur into the log table without locking/erroring out

Upvotes: 0

Views: 2095

Answers (1)

John Rotenstein
John Rotenstein

Reputation: 269370

Amazon Redshift does not work well with lots of small INSERT statements.

From Use a Multi-Row Insert - Amazon Redshift:

If a COPY command is not an option and you require SQL inserts, use a multi-row insert whenever possible. Data compression is inefficient when you add data only one row or a few rows at a time.

Multi-row inserts improve performance by batching up a series of inserts. The following example inserts three rows into a four-column table using a single INSERT statement. This is still a small insert, shown simply to illustrate the syntax of a multi-row insert.

insert into category_stage values
(default, default, default, default),
(20, default, 'Country', default),
(21, 'Concerts', 'Rock', default);

Alternatively, output the data to Amazon S3, then perform a bulk load using the COPY command. This will be much more efficient because it can perform the load in parallel across all nodes.

Upvotes: 1

Related Questions