Necrophades
Necrophades

Reputation: 605

Apache Flink: Ordered timestamps with parallelism

I have a datastream in which the order of the events is important. The time characteristic is set to EventTime as the incoming records have a timestamp within them.

In order to guarantee the ordering, I set the parallelism for the program to 1. Could that become a problem, performance wise, when my program gets more complex?

If I understand correctly, I need to assign watermarks to my events, if I want to keep the stream ordered by timestamp. This is quite simple. But I'm reading that even that doesn't guarantee order? Later on, I want to do stateful computations over that stream. So, for that I use a FlatMap function, which needs the stream to be keyed. But if I key the stream, the order is lost again. AFAIK this is because of different stream partitions, which are "caused" by parallelism.

I have two questions:

Upvotes: 0

Views: 683

Answers (1)

David Anderson
David Anderson

Reputation: 43514

Several points to consider:

Setting the parallelism to 1 for the entire job will prevent scaling your application, which will affect performance. Whether this actually matters depends on your application requirements, but it would certainly be limitation, and could be a problem.

If the aggregates you've mentioned are meant to be computed globally across all the event records then operating in parallel will require doing some pre-aggregation in parallel. But in this case you will then have to reduce the parallelism to 1 in the later stages of your job graph in order to produce the ultimate (global) results.

If on the other hand these aggregates are to be computed independently for each value of some key, then it makes sense to consider keying the stream and to use that partitioning as the basis for operating in parallel.

All of the operations you mention require some state, whether computing max, min, averages, or uptime and downtime. For example, you can't compute the maximum without remembering the maximum encountered so far.

If I understand correctly how Flink's NiFi source connector works, then if the source is operating in parallel, keying the stream will result in out-of-order events.

However, none of the operations you've mentioned require that the data be delivered in-order. Computing uptime (and downtime) on an out-of-order stream will require some buffering -- these operations will need to wait for out-of-order data to arrive before they can produce results -- but that's certainly doable. That's exactly what watermarks are for; they define how long to wait for out-of-order data. You can use an event-time timer in a ProcessFunction to arrange for an onTimer callback to be called when all earlier events have been processed.

You could always sort the keyed stream. Here's an example.

The uptime/downtime calculation should be easy to do with Flink's CEP library (which sorts its input, btw).

UPDATE:

It is true that after applying a ProcessFunction to a keyed stream the stream is no longer keyed. But in this case you could safely use reinterpretAsKeyedStream to inform Flink that the stream is still keyed.

As for CEP, this library uses state on your behalf, making it easier to develop applications that need to react to patterns.

Upvotes: 1

Related Questions