Reputation: 751
I have two streams in Flink stream1
has 70000 records per sec and stream2
may or may not have data.
// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
environment
.addSource(createHFAConsumer())
.name("hfa source");
SingleOutputStreamOperator<EVWindow> stream2 = environment
.addSource(createHFDConsumer())
.name("hfd source");
DataStream<Message> pStream =
stream1
.coGroup(stream2)
.where(obj -> obj.getid())
.equalTo(ev -> ev.getid())
.window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.evictor(TimeEvictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.apply(new CalculateCoGroupFunction());
This works perfectly fine when both Streams have data , but when stream2 has no data the job fails with very high back pressure. The CPU utilization also spikes by 200%.
How do I handle outer join in such scenario
Upvotes: 3
Views: 610
Reputation: 31
Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks
and
which means flink was buffering data from stream1 while waiting for stream2 and would eventually result in High Backpressure and finally a OOM.
It works for coGroup()
method from the DataStream<T>
class which returns CoGroupedStreams<T, T2>
.
To avoid such behavior we can use union(DataStream<T>... streams)
method which returns a simple DataStream<T>
where the watermarks will be advancing as in a usual stream.
The only problem which we need to solve is to have a common schema (class) for both streams. We can use some aggregation class with two fields:
public class Aggregator {
private FlatHighFrequencyAnalog flatHighFrequencyAnalog;
private EVWindow evWindow;
public Aggregator(FlatHighFrequencyAnalog flatHighFrequencyAnalog) {
this.flatHighFrequencyAnalog = flatHighFrequencyAnalog;
}
public Aggregator(EVWindow evWindow) {
this.evWindow = evWindow;
}
public FlatHighFrequencyAnalog getFlatHighFrequencyAnalog() {
return flatHighFrequencyAnalog;
}
public EVWindow getEVWindow() {
return evWindow;
}
}
Also, a more generic way is to use Either<L, R>
class from org.apache.flink.types
.
SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream1 =
environment
.addSource(createHFAConsumer())
.map(hfa -> Either.Left(hfa));
SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream2 =
environment
.addSource(createHFDConsumer())
.map(hfd -> Either.Right(hfd));
DataStream<Message> pStream =
stream1
.union(stream2)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Either<EVWindow, FlatHighFrequencyAnalog>>forBoundedOutOfOrderness(
ofSeconds(MAX_OUT_OF_ORDERNESS))
.withTimestampAssigner((input, timestamp) -> input.isLeft() ? input.left().getTimeStamp() : input.right().getTimeStamp()))
.keyBy(value -> value.isLeft() ? value.left().getId() : value.right().getId())
.window(TumblingEventTimeWindows.of(Time.minutes(MINUTES)))
.process(new ProcessWindowFunction());
List<EVWindow> evWindows =
Streams.stream(elements)
.filter(Either::isLeft)
.map(Either::left)
.collect(Collectors.toList());
List<FlatHighFrequencyAnalog> highFrequencyAnalogs =
Streams.stream(elements)
.filter(Either::isRight)
.map(Either::right)
.collect(Collectors.toList());
Upvotes: 1
Reputation: 751
Thanks David Anderson for the pointers
RCA :
The main issue came when I tried to create a Tumbling Window around my Stream.
As per Flink Documentation
In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness
Since there was no incoming data for stream2
the window never materialized. As David pointed out
Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks
which means flink was buffering data from stream1
while waiting for stream2
and would eventually result in High Backpressure and finally a OOM.
The Solution :
I created a external script to send dummy heartbeat messages to the Kafka Stream stream2
at the desired interval and added logic in my application to ignore these messages for computation.
This forced the stream2
and stream1
to advance the watermarks and the window was removed out of context.
Upvotes: 1
Reputation: 43707
I believe the problem is that the lack of watermarks from the idle stream is holding back the overall watermark. Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks. This can then lead to problems like the one you are experiencing.
You have a couple of options:
stream2
to be Watermark.MAX_WATERMARK
, thereby giving stream1
complete control of watermarking.stream2
is idle, and artificially advance the watermark despite the lack of events. Here is an example.Upvotes: 1