Reputation: 286
I am new to Flink and have gone through site(s)/examples/blogs to get started. I am struggling with the correct use of operators. Basically I have 2 questions
Question 1: Does Flink support declarative exception handling, I need to handle parse/validate/... errors?
Question 2: Can I conditionally attach different Sink(s)?
I am using Flink 1.3.2. Here is the relevant portion of my job
.....
.....
DataStream<String> eventTextStream = env.addSource(messageSource)
KeyedStream<EventPojo, Tuple> eventPojoStream = eventTextStream
// parse, transform or enrich
.flatMap(new MyParseTransformEnrichFunction())
.assignTimestampsAndWatermarks(new EventAscendingTimestampExtractor())
.keyBy("eventId");
// split stream based on eventType as different reduce and windowing functions need to be applied
SplitStream<EventPojo> splitStream = eventPojoStream
.split(new EventStreamSplitFunction());
// need to apply reduce function
DataStream<EventPojo> event1TypeStream = splitStream.select("event1Type");
// need to apply reduce function
DataStream<EventPojo> event2TypeStream = splitStream.select("event2Type");
// need to apply time based windowing function
DataStream<EventPojo> event3TypeStream = splitStream.select("event3Type");
....
....
env.execute("Event Processing");
Am I using the correct operators here?
Update 1:
Tried using the ProcessFunction as suggested by @alpinegizmo but that didn't work as it depends upon a keyed stream which I don't have until I parse/validate input. I get "InvalidProgramException: Field expression must be equal to '*' or '_' for non-composite types. ".
It's such a common use case where your first parse/validate input and won't have keyed stream yet, so how do you solve it?
Thanks for your patience and help.
Upvotes: 1
Views: 3665
Reputation: 43514
There's one key building block that you've overlooked. Take a look at side outputs.
This mechanism provides a typesafe way to produce any number of additional output streams. This can be a clean way to report errors, among other uses. In Flink 1.3 side outputs can only be used with ProcessFunction, but 1.4 will add side outputs to ProcessWindowFunction.
Upvotes: 2