user13906258
user13906258

Reputation: 248

How to collect side output from rich sink function in Apache Flink?

I am trying below scenario in Flink

  1. Flink consume data from kafka topic and validate against avro schema
  2. Converting the data into JSON payload in process function after some enrichments on the data
  3. After enrichment of data of it should be written to Postgres database and upload data to Azure blob storage through Flink RichSinkFunction

I am stuck in one place in Sink function the process should happen in transactional way meaning if any exception while persisting data to postgres or any exception happens while uploading data to Azure blob storage the process should through exception and the it should rollback the data from database as well it should data from Azure blob storage. In case of exception the payload received Sink function should be put a kafka topic but i am not getting how to handle that i know that in processfunction it support a side through which we can send the data to different topic but Sink won't support side output.

Is there a way i can publish the payload received in Sink to Kakfa topic in case of any exception.

Upvotes: 3

Views: 2081

Answers (1)

whatsinthename
whatsinthename

Reputation: 2157

I am not sure about the programming language which you are using right now but you can do something like below using Scala inside a process function and call sink methods based on the output returned by the process function.

          Try {
      
          } 
          match {
          case Success(x) => {
          .
          .

           Right(x)
          }
          case Failure(err) => {
            .
            .
           Left(err)

          }
        }  

Your process element method will look something like below:

override def process(key: Int,context: Context, elements: Iterable[String], out: Collector[(String, String)]): Unit = {
      for (i <- elements) {
        println("Inside Process.....")
        parseJson(i) match {
          case Right(data) => {
            context.output(goodOutputTag, data)
            out.collect(data) //usually used to collect records and emits them to writer.and so on,collect be called when needs to write data.
            
          }
          case Left(err) => {
            context.output(badOutputTag, dataTuple) // side outputs, when needed to split a stream of data. Emit data to side output and a new datastream can be created using .getSideOutput(outputTag)
           
          }
        }
      }

Now, use these output tags from the Success and Failure cases and create a data stream out of it in your invoker object and call your respective sink methods.

Upvotes: 3

Related Questions