EranM
EranM

Reputation: 303

spark-redshift-connector: combine saving to redshift with a delete query

i would like to update Redshift periodically with spark-redshift-connector (scala). Each update starts with a Delete operation. (i am performing kind of an Upsert to Redshift)

Is there a way i can execute it with the library ? Can it be with a transaction ?

Any advice will be much appreciated.

thanks, Eran.

Upvotes: 2

Views: 2900

Answers (2)

Cauchy Wu
Cauchy Wu

Reputation: 49

Please refer to below example:

val min_date=mydf.select(min("actual_ship_date")).rdd.map(line=>line(0)).take(1)
val max_date=mydf.select(max("actual_ship_date")).rdd.map(line=>line(0)).take(1)
val query="delete from semi_sdt.kgd_tsb_shippment where 
actual_ship_date>='"+min_date(0).toString+"' and 
actual_ship_date<='"+max_date(0).toString+"'"
//Write data to RedShift
mydf.coalesce(1).write.
format("com.databricks.spark.redshift").
option("url",redShiftUrl).
option("dbtable","semi_sdt.kgd_tsb_shippment").
option("tempdir",s3dir).
option("forward_spark_s3_credentials",true).
option("preactions",query).
mode("append").
save()

Upvotes: 2

EranM
EranM

Reputation: 303

it seems that there is an option called "preactions" (and "postactions") which does the trick.

see the docs: https://github.com/databricks/spark-redshift

Upvotes: 1

Related Questions