Jonas Gröger
Jonas Gröger

Reputation: 1628

Apache Flink: Scope of ValueState in ConnectedStreams

I have a custom RichCoFlatMapFunction that uses a ValueState member. The docs say that

the key/value interface is scoped to the key of the current input element

See https://ci.apache.org/projects/flink/flink-docs-master/dev/state.html#using-the-keyvalue-state-interface

What if I key on connected streams like this:

val connected = streamA
    .connect(streamB)
    .keyBy(a=>a.foo, b=>b.bar)
    .flatMap(new MyRichCoFlatMapFunction)

How are the semantics then? Is it keyed to the first, the second or the combination of the two?

Upvotes: 2

Views: 736

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

The first argument of keyBy (a => a.foo) defines the key of the first stream (streamA). The second argument (b => b.bar) the key of the second stream (streamB). Both arguments must return a key of the same type, i.e., the type of a.foo and b.bar must be the same.

The connect operator will then, send all records from streamA and streamB with identical key to the same operator. The stateful RichCoFlatMapFunction will set the ValueState for the key of the current element, i.e., if flatMap1(a: TypeA, out: Collector[TypeOut]) is called for a value from streamA, the state is set for the key a.foo and if flatMap2(b: TypeB, out: Collector[TypeOut]) is called for a value from streamB, the state is set for the key b.bar.

Upvotes: 4

Related Questions