Shawn
Shawn

Reputation: 95

spark structured streaming avro to avro and custom Sink

Can someone refer me to a good example or sample for writing avro in S3 or any file system? I am using a custom Sink but I would like to pass some properties Map through constructor of the SinkProvider which can be further pass to the Sink, I guess?

Updated Code:

val query = df.mapPartitions { itr =>
  itr.map { row =>
    val rowInBytes = row.getAs[Array[Byte]]("value")
    MyUtils.deserializeAvro[GenericRecord](rowInBytes).toString
  }
}.writeStream
  .format("com.test.MyStreamingSinkProvider")
  .outputMode(OutputMode.Append())
  .queryName("testQ" )
  .trigger(ProcessingTime("10 seconds"))
  .option("checkpointLocation", "my_checkpoint_dir")
  .start()

query.awaitTermination()

Sink Provider:

class MyStreamingSinkProvider extends StreamSinkProvider {

  override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
    new MyStreamingSink
  }
}

Sink:

class MyStreamingSink extends Sink with Serializable {

  final val log: Logger = LoggerFactory.getLogger(classOf[MyStreamingSink])

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    //For saving as text doc
    data.rdd.saveAsTextFile("path")

    log.warn(s"Total records processed: ${data.count()}")
    log.warn("Data saved.")
  }
}

Upvotes: 5

Views: 694

Answers (1)

Yuriy Bondaruk
Yuriy Bondaruk

Reputation: 4750

You should be able to pass parameters to your custom sink via writeStream.option(key, value):

DataStreamWriter writer = dataset.writeStream()
  .format("com.test.MyStreamingSinkProvider")
  .outputMode(OutputMode.Append())
  .queryName("testQ" )
  .trigger(ProcessingTime("10 seconds"))
  .option("key_1", "value_1")
  .option("key_2", "value_2")
  .start()

In this case parameters in method MyStreamingSinkProvider.createSink(...) will contain key_1 and key_2

Upvotes: 3

Related Questions