Vignesh I
Vignesh I

Reputation: 2221

Group records by key and collect into a ListBuffer in Flink Streaming

I have a Flink DataStream of type DataStream[(String, somecaseclass)]. I want to group-by on the first field of the Tuple which is String and create a ListBuffer[somecaseclass]. Below is what I have tried:

val emptylistbuffer = new ListBuffer[somecaseclass]
inputstream
  .keyBy(0)
  .fold(emptylistbuffer){case(outputbuffer,b) => {outputbuffer+=b._2}}

But this gives me an output for each row, meaning if there are 10 input rows, the first output row is just concatenated on first row only, the tenth row gives me a concatenated output on ten rows. However, I would want just the tenth row. I checked almost all the transformations on Flink DataStream but nothing suits the use-case.

Input:

(filename1.dat,somecaseclass("abc","1",2))
(filename1.dat,somecaseclass("dse","2",3))
(filename1.dat,somecaseclass("daa","1",4))

Expected output:

(filename.dat,ListBuffer(somecaseclass("abc","1",2),somecaseclass("dse","2",3),somecaseclass("daa","1",4)))

Upvotes: 3

Views: 2909

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

The DataStream API considers a DataStream to be unbounded. This means a DataStream might provided an infinite number of records. Hence, it is not possible to "just" emit an aggregation result (in your case the complete ListBuffer) after all records have been received, because there might be more records that need to he aggregated (added to the ListBuffer). In principle, an aggregate on a DataStream could never produce a final result because more records might follow. Since this is not very practical, Flink's DataStream API produces a new result for each incoming record.

A common approach to compute aggregates on unbounded streams are windows. Windows define bounded sections on a stream on which aggregates can be computed and final results be emitted. Flink provides built-in windows based on time or record counts. For example your record collection function on a tumbling window of 1 hour, would collect all records that arrived within one hour.

Please check the Flink documentation for the different window types and how to use them.

Upvotes: 2

Related Questions