sergiopf
sergiopf

Reputation: 61

Sink in Flink blocks the task execution

I have a Sink in Flink, which extends from RichSinkFunction. It delays the execution of all the Flink task (if I remove it, it takes a half, from 10' to less than 5'). This is its configuration:

OutputTag<List<SessionSinkModel>> inProgressSessionOutputTag = new OutputTag<>(ProcessorConstants.IN_PROGRESS_SESSIONS_SINK_NAME) {};           
    
SingleOutputStreamOperator<SessionAccumulator> aggregatedSessionStream =
    collectionMessageDataStream
        .keyBy(CollectionMessage::getSessionId)
        .process(sessionKeyedProcessFunction)
        .uid("SessionWindow")
        .name("Session Window")
        .setParallelism(4);
            
DataStream<List<SessionSinkModel>> inProgressSessionStream =  aggregatedSessionStream
                            .getSideOutput(inProgressSessionOutputTag);
                
inProgressSessionStream
    .broadcast()
    .addSink(new SessionAPISink(config))
    .uid("Sessions side output")
    .name("Sessions side output");

This Sink sends massive data by POST to an endpoint, this POST call is asynchronous (as far as I know like the Sink call). I use the standard call using the output from KeyedBroadcastProcessFunction.ReadOnlyContext ctx.

ctx.output(outputTag, message);

How can I do to make this Sink not to block the task execution?

Upvotes: 0

Views: 202

Answers (1)

kkrugler
kkrugler

Reputation: 9245

There are two issues that I see with the workflow...

  1. You shouldn't be doing a inProgressSessionStream.broadcast()
  2. For efficient async IO, you want to use Flink's AsyncIO support, and then follow that with a DiscardingSink.

Upvotes: 1

Related Questions