Reputation: 303
i would like to update Redshift periodically with spark-redshift-connector (scala). Each update starts with a Delete operation. (i am performing kind of an Upsert to Redshift)
Is there a way i can execute it with the library ? Can it be with a transaction ?
Any advice will be much appreciated.
thanks, Eran.
Upvotes: 2
Views: 2900
Reputation: 49
Please refer to below example:
val min_date=mydf.select(min("actual_ship_date")).rdd.map(line=>line(0)).take(1)
val max_date=mydf.select(max("actual_ship_date")).rdd.map(line=>line(0)).take(1)
val query="delete from semi_sdt.kgd_tsb_shippment where
actual_ship_date>='"+min_date(0).toString+"' and
actual_ship_date<='"+max_date(0).toString+"'"
//Write data to RedShift
mydf.coalesce(1).write.
format("com.databricks.spark.redshift").
option("url",redShiftUrl).
option("dbtable","semi_sdt.kgd_tsb_shippment").
option("tempdir",s3dir).
option("forward_spark_s3_credentials",true).
option("preactions",query).
mode("append").
save()
Upvotes: 2
Reputation: 303
it seems that there is an option called "preactions" (and "postactions") which does the trick.
see the docs: https://github.com/databricks/spark-redshift
Upvotes: 1