Reputation: 248
I am trying below scenario in Flink
I am stuck in one place in Sink function the process should happen in transactional way meaning if any exception while persisting data to postgres or any exception happens while uploading data to Azure blob storage the process should through exception and the it should rollback the data from database as well it should data from Azure blob storage. In case of exception the payload received Sink function should be put a kafka topic but i am not getting how to handle that i know that in processfunction it support a side through which we can send the data to different topic but Sink won't support side output.
Is there a way i can publish the payload received in Sink to Kakfa topic in case of any exception.
Upvotes: 3
Views: 2081
Reputation: 2157
I am not sure about the programming language which you are using right now but you can do something like below using Scala
inside a process function
and call sink methods
based on the output returned by the process function.
Try {
}
match {
case Success(x) => {
.
.
Right(x)
}
case Failure(err) => {
.
.
Left(err)
}
}
Your process element
method will look something like below:
override def process(key: Int,context: Context, elements: Iterable[String], out: Collector[(String, String)]): Unit = {
for (i <- elements) {
println("Inside Process.....")
parseJson(i) match {
case Right(data) => {
context.output(goodOutputTag, data)
out.collect(data) //usually used to collect records and emits them to writer.and so on,collect be called when needs to write data.
}
case Left(err) => {
context.output(badOutputTag, dataTuple) // side outputs, when needed to split a stream of data. Emit data to side output and a new datastream can be created using .getSideOutput(outputTag)
}
}
}
Now, use these output tags
from the Success
and Failure
cases and create a data stream out of it in your invoker object
and call your respective sink methods.
Upvotes: 3