Reputation: 61
I have a Sink in Flink, which extends from RichSinkFunction. It delays the execution of all the Flink task (if I remove it, it takes a half, from 10' to less than 5'). This is its configuration:
OutputTag<List<SessionSinkModel>> inProgressSessionOutputTag = new OutputTag<>(ProcessorConstants.IN_PROGRESS_SESSIONS_SINK_NAME) {};
SingleOutputStreamOperator<SessionAccumulator> aggregatedSessionStream =
collectionMessageDataStream
.keyBy(CollectionMessage::getSessionId)
.process(sessionKeyedProcessFunction)
.uid("SessionWindow")
.name("Session Window")
.setParallelism(4);
DataStream<List<SessionSinkModel>> inProgressSessionStream = aggregatedSessionStream
.getSideOutput(inProgressSessionOutputTag);
inProgressSessionStream
.broadcast()
.addSink(new SessionAPISink(config))
.uid("Sessions side output")
.name("Sessions side output");
This Sink sends massive data by POST to an endpoint, this POST call is asynchronous (as far as I know like the Sink call). I use the standard call using the output from KeyedBroadcastProcessFunction.ReadOnlyContext ctx.
ctx.output(outputTag, message);
How can I do to make this Sink not to block the task execution?
Upvotes: 0
Views: 202
Reputation: 9245
There are two issues that I see with the workflow...
inProgressSessionStream.broadcast()
Upvotes: 1