Drudge
Drudge

Reputation: 528

How to define the start position of a dataset in Apache Flink?

I try to implement a kind of a window function in Apache Flink. For example, I want to take the elements 1 - 5 and do something with them, afterwards I want to take the elements 6 - 10 and so on.

Currently I have a dataset whose data is derived by a CSV file:

DataSet<Tuple2<Double, Double>> csvInput = env
        .readCsvFile(csvpath)
        .includeFields(usedFields)
        .types(Double.class, Double.class);

Now I want to have a subset with the first 5 elements of this dataset. I might be able to do this with the first-function:

DataSet<Tuple2<Double, Double>> subset1 = csvInput.first(5);

But how to get the next 5 elements? Is there a function like a startAt function, that I can use? For example something like this:

DataSet<Tuple2<Double, Double>> subset2 = csvInput.first(5).startAt(6);

I haven't found anything in the Apache Flink Java API. What is the best way to archive this?

Upvotes: 4

Views: 638

Answers (1)

Stephan Ewen
Stephan Ewen

Reputation: 2371

Matthias Sax has given good pointers to the streaming API for windowing. If the application follows the model of streaming analytics, the streaming API is definitely the right way to go.

Here are some more resources on stream windowing: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#window-operators

Windows in the Batch API

It is possible to manually apply some form of windowing in the Batch API as well. When applying windows, the following should be considered:

  • Most operations are parallel. When windowing n elements together, this usually happens per parallel partition independently.

  • There is no implicit order of elements. Even when reading from a file in parallel, it may be that later sections of the file are read by a faster parallel reader thread, and records from these later segments arrives earlier. Windowing n elements in arrival order thus gives you simply some n elements.

Window by Order in the File (non parallel)

To window by order in a file, you can set the input to be non-parallel (use setParallelism(1) on the source) and then use a mapPartition() to slide the window over the elements.

Ordered Window by some value (e.g., a timestamp)

You can window ungrouped (no key) by sorting a partition (sortPartition().mapPartition()) or window over groups using groupBy(...).sortGroup(...).reduceGroup(...). The functions bring the elements in order with respect to the value you want to window on, and slide over the data to window.

Some parallel windows (no good semantics)

You can always read in parallel and slide a window over the data stream using mapPartition(). However, as described above, the parallel execution and undefined order of elements will give you some windowed result, rather than a predictable windowed result.

Upvotes: 3

Related Questions