Green Lomu
Green Lomu

Reputation: 75

Apache flink keyby function with field expression

Samples Messages:

>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp1", "value":12.0, "timestamp":19200230}
>{"sensor":"temp2", "value":5, "timestamp":19200230}
>{"sensor":"temp2", "value":5, "timestamp":19200230}

I am trying to build an stream aggregation with a keyby method.

DataStream<Message> messageSumStream = messageStream.keyBy("sensor").timeWindowAll(Time.minutes(5)).sum("value");

I expected

{"sensor": "temp1", "value": 36.000000, "timestamp":19200230 }
{"sensor": "temp2", "value": 10.000000, "timestamp":19200230 }

But got:

{"sensor": "temp1", "value": 46.000000, "timestamp":19200230 }

What am I missing here?

Upvotes: 0

Views: 120

Answers (1)

Ezequiel
Ezequiel

Reputation: 3592

You are using timeWindowAll from DataStream class instead of timeWindow from KeyedDataStream, resulting a code that ignores the keyBy.

Try this:

DataStream<Message> messageSumStream = messageStream.keyBy("sensor").timeWindow(Time.minutes(5)).sum("value");

Upvotes: 2

Related Questions