Somasundaram Sekar
Somasundaram Sekar

Reputation: 5524

[Structured Streaming]: Structured Streaming into Redshift sink

Is it possible to write the Dataframe backed by Kafka Streaming source into AWS Redshift, we have in the past used spark-redshift to write into Redshift, but I presume it will not work with DataFrame##writeStream. Also writing with JDBC connector with ForeachWriter is also may not be a good idea given the way Redshift works.

One possible approach that I have come across from Yelp blog is to write the files into S3 and then invoke Redshift COPY with a Manifest file having the S3 Object path, in case of Structured Streaming, how can I control the files into which I write to S3? and also have a separate trigger to create a manifest file after writing say 5 files into S3.

Any other possible solution is also appreciated. Thanks in advance.

Upvotes: 1

Views: 1826

Answers (2)

Yuriy Bondaruk
Yuriy Bondaruk

Reputation: 4750

There is a way to use spark-redshift in structured streaming but you have to implement a few additional classes in your own fork. First of all you need a RedshiftSink that should implement org.apache.spark.sql.execution.streaming.Sink interface:

private[redshift] class RedshiftSink(
    sqlContext: SQLContext,
    parameters: MergedParameters,
    redshiftWriter: RedshiftWriter) extends Sink {

  private val log = LoggerFactory.getLogger(getClass)

  @volatile private var latestBatchId = -1L

  override def toString(): String = "RedshiftSink"

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    if (batchId <= latestBatchId) {
      log.info(s"Skipping already committed batch $batchId")
    } else {
      val mode = if (parameters.overwrite) SaveMode.Overwrite else SaveMode.Append
      redshiftWriter.saveToRedshift(sqlContext, data, mode, parameters)
      latestBatchId = batchId
    }
  }
}

Then com.databricks.spark.redshift.DefaultSource should be extended with implementation of org.apache.spark.sql.sources.StreamSinkProvider:

  /**
   * Creates a Sink instance
   */
  override def createSink(
    sqlContext: SQLContext,
    parameters: Map[String, String],
    partitionColumns: Seq[String],
    outputMode: OutputMode): Sink = {
      new RedshiftSink(sqlContext, Parameters.mergeParameters(parameters), new RedshiftWriter(jdbcWrapper, s3ClientFactory))
  }

Now you should be able to use it in structured streaming:

dataset.writeStream()
        .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
        .format("com.databricks.spark.redshift")
        .outputMode(OutputMode.Append())
        .queryName("redshift-stream")
        .start()

Update

To fix issue with reporting metrics to StreamExecution RedshiftWriter.unloadData() has to be changed to use data.queryExecution.toRdd.mapPartitions instead of data.rdd.mapPartitions since data.rdd creates a new plan that is not visible to StreamExecution (which uses the existing plan to collect metrics). It also requires changing a conversion functions to this:

val conversionFunctions: Array[(InternalRow, Int) => Any] = data.schema.fields.map { field =>
  field.dataType match {
    case DateType =>
      val dateFormat = Conversions.createRedshiftDateFormat()
      (row: InternalRow, ordinal: Int) => {
        if (row.isNullAt(ordinal)) null else dateFormat.format(
          DateTimeUtils.toJavaDate(row.getInt(ordinal)))
      }
    case TimestampType =>
      val timestampFormat = Conversions.createRedshiftTimestampFormat()
      (row: InternalRow, ordinal: Int) => {
        if (row.isNullAt(ordinal)) null else timestampFormat.format(
          DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
      }
    case StringType =>
      (row: InternalRow, ordinal: Int) => {
        if (row.isNullAt(ordinal)) null else row.getString(ordinal)
      }
    case dt: DataType =>
      (row: InternalRow, ordinal: Int) => {
        if (row.isNullAt(ordinal)) null else row.get(ordinal, dt)
      }
  }
}

Upvotes: 2

Joe Harris
Joe Harris

Reputation: 14035

Spark is able to load normal dataframes to Redshift very effectively but I haven't worked with streams in Spark yet.

If you can continuously write the stream output to a standard df then, at a specified interval, you could load that df to Redshift and empty it.

The other alternative would be sending the stream to Kinesis and using Kinesis Firehose to load it to Redshift. Seems excessive to add another streaming layer to the stack though.

Upvotes: 0

Related Questions