Reputation: 5524
Is it possible to write the Dataframe backed by Kafka Streaming source into AWS Redshift, we have in the past used spark-redshift to write into Redshift, but I presume it will not work with DataFrame##writeStream
. Also writing with JDBC connector with ForeachWriter
is also may not be a good idea given the way Redshift works.
One possible approach that I have come across from Yelp blog is to write the files into S3 and then invoke Redshift COPY
with a Manifest file having the S3 Object path, in case of Structured Streaming, how can I control the files into which I write to S3? and also have a separate trigger to create a manifest file after writing say 5 files into S3.
Any other possible solution is also appreciated. Thanks in advance.
Upvotes: 1
Views: 1826
Reputation: 4750
There is a way to use spark-redshift in structured streaming but you have to implement a few additional classes in your own fork. First of all you need a RedshiftSink that should implement org.apache.spark.sql.execution.streaming.Sink
interface:
private[redshift] class RedshiftSink(
sqlContext: SQLContext,
parameters: MergedParameters,
redshiftWriter: RedshiftWriter) extends Sink {
private val log = LoggerFactory.getLogger(getClass)
@volatile private var latestBatchId = -1L
override def toString(): String = "RedshiftSink"
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= latestBatchId) {
log.info(s"Skipping already committed batch $batchId")
} else {
val mode = if (parameters.overwrite) SaveMode.Overwrite else SaveMode.Append
redshiftWriter.saveToRedshift(sqlContext, data, mode, parameters)
latestBatchId = batchId
}
}
}
Then com.databricks.spark.redshift.DefaultSource
should be extended with implementation of org.apache.spark.sql.sources.StreamSinkProvider
:
/**
* Creates a Sink instance
*/
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new RedshiftSink(sqlContext, Parameters.mergeParameters(parameters), new RedshiftWriter(jdbcWrapper, s3ClientFactory))
}
Now you should be able to use it in structured streaming:
dataset.writeStream()
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.format("com.databricks.spark.redshift")
.outputMode(OutputMode.Append())
.queryName("redshift-stream")
.start()
To fix issue with reporting metrics to StreamExecution RedshiftWriter.unloadData()
has to be changed to use data.queryExecution.toRdd.mapPartitions
instead of data.rdd.mapPartitions
since data.rdd
creates a new plan that is not visible to StreamExecution (which uses the existing plan to collect metrics). It also requires changing a conversion functions to this:
val conversionFunctions: Array[(InternalRow, Int) => Any] = data.schema.fields.map { field =>
field.dataType match {
case DateType =>
val dateFormat = Conversions.createRedshiftDateFormat()
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else dateFormat.format(
DateTimeUtils.toJavaDate(row.getInt(ordinal)))
}
case TimestampType =>
val timestampFormat = Conversions.createRedshiftTimestampFormat()
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else timestampFormat.format(
DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
}
case StringType =>
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else row.getString(ordinal)
}
case dt: DataType =>
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else row.get(ordinal, dt)
}
}
}
Upvotes: 2
Reputation: 14035
Spark is able to load normal dataframes to Redshift very effectively but I haven't worked with streams in Spark yet.
If you can continuously write the stream output to a standard df then, at a specified interval, you could load that df to Redshift and empty it.
The other alternative would be sending the stream to Kinesis and using Kinesis Firehose to load it to Redshift. Seems excessive to add another streaming layer to the stack though.
Upvotes: 0