Assaf Moldavsky
Assaf Moldavsky

Reputation: 1721

Spark DataFrameReader from RedShift tempDir dump

Is there a way to create a DataFrame from a tempDir dump from RedShift?

My use case is when a job fails, I want to retry but continuing from the temporary data dump that was dumped to S3 rather than re-fetching the dataset all over again from RedShift which is HUGE!

The load code does this

val df1 = spark.read
  .format("com.databricks.spark.redshift")
  .option("url", jdbcUrl)
  .option("dbtable", spmeTable)
  .option("tempdir", tempDir)
  .option("user", jdbcUsername)
  .option("password", jdbcPassword)
  .option("forward_spark_s3_credentials", true)
  .load();

The job later on fails, but I want to recreate df1 without fetching anything from RedShift again.

Is there a way to do this?

Found a method called createDataFrame under SparkSession, not sure if that is a possible solution... https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SparkSession.html

UPDATE #1

the temp dir looks like the directory structure here https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD_command_examples.html

I opened one of the temp files from S3 and it is pipe delimited

edd66540-fa17-599b-9b22-7df29a5f9229|kNOCugU4wuKAUw7m2UXS7MfX|2018-11-27 19:48:44|POST|f|@NULL@|@NULL@|@NULL@|@NULL@|https://www.example.com/r/conversations/0grt6540-

UPDATE #2

According to this https://github.com/databricks/spark-redshift/tree/master/tutorial

Once the files are written to S3, a custom InputFormat (com.databricks.spark.redshift.RedshiftInputFormat) is used to consume the files in parallel. This class is similar to Hadoop's standard TextInputFormat class, where the key is the byte offset of the start of each line in the file. The value class, however, is of type Array[String] (unlike, TextInputFormat, whose type is Text). The values are created by splitting the lines using the default delimiter (|). The RedshiftInputFormat processes the S3 files line-by-line to produce an RDD. The schema obtained earlier is then applied on this RDD to convert the strings to the proper data types and to generate a DataFrame.

Any idea how to do all that but skipping the unload?

Upvotes: 1

Views: 2550

Answers (1)

DaRkMaN
DaRkMaN

Reputation: 1054

The connector dumps the data in avro format by default(Can be dumped in CSV, CSV GZIP format as well).
Also note that the connector does not automatically clean the temporary location(check the Note).
We could read the data by pointing to the tempdir

val df = spark.read.format("avro").load(tempdir)

Upvotes: 1

Related Questions