Aurvoir
Aurvoir

Reputation: 286

Apache Flink Error Handing and Conditional Processing

I am new to Flink and have gone through site(s)/examples/blogs to get started. I am struggling with the correct use of operators. Basically I have 2 questions

Question 1: Does Flink support declarative exception handling, I need to handle parse/validate/... errors?

Question 2: Can I conditionally attach different Sink(s)?

I am using Flink 1.3.2. Here is the relevant portion of my job

    .....
    .....
    DataStream<String> eventTextStream = env.addSource(messageSource)

    KeyedStream<EventPojo, Tuple> eventPojoStream = eventTextStream
            // parse, transform or enrich
            .flatMap(new MyParseTransformEnrichFunction())
            .assignTimestampsAndWatermarks(new EventAscendingTimestampExtractor())
            .keyBy("eventId");

    // split stream based on eventType as different reduce and windowing functions need to be applied
    SplitStream<EventPojo> splitStream = eventPojoStream
            .split(new EventStreamSplitFunction());

    // need to apply reduce function
    DataStream<EventPojo> event1TypeStream = splitStream.select("event1Type");

    // need to apply reduce function
    DataStream<EventPojo> event2TypeStream = splitStream.select("event2Type");

    // need to apply time based windowing function
    DataStream<EventPojo> event3TypeStream = splitStream.select("event3Type");

    ....
    ....

    env.execute("Event Processing");      

Am I using the correct operators here?

Update 1:

Tried using the ProcessFunction as suggested by @alpinegizmo but that didn't work as it depends upon a keyed stream which I don't have until I parse/validate input. I get "InvalidProgramException: Field expression must be equal to '*' or '_' for non-composite types. ".

It's such a common use case where your first parse/validate input and won't have keyed stream yet, so how do you solve it?

Thanks for your patience and help.

Upvotes: 1

Views: 3665

Answers (1)

David Anderson
David Anderson

Reputation: 43514

There's one key building block that you've overlooked. Take a look at side outputs.

This mechanism provides a typesafe way to produce any number of additional output streams. This can be a clean way to report errors, among other uses. In Flink 1.3 side outputs can only be used with ProcessFunction, but 1.4 will add side outputs to ProcessWindowFunction.

Upvotes: 2

Related Questions