Reputation: 1628
I have a custom RichCoFlatMapFunction
that uses a ValueState
member. The docs say that
the key/value interface is scoped to the key of the current input element
What if I key on connected streams like this:
val connected = streamA
.connect(streamB)
.keyBy(a=>a.foo, b=>b.bar)
.flatMap(new MyRichCoFlatMapFunction)
How are the semantics then? Is it keyed to the first, the second or the combination of the two?
Upvotes: 2
Views: 736
Reputation: 18987
The first argument of keyBy
(a => a.foo
) defines the key of the first stream (streamA
). The second argument (b => b.bar
) the key of the second stream (streamB
). Both arguments must return a key of the same type, i.e., the type of a.foo
and b.bar
must be the same.
The connect operator will then, send all records from streamA
and streamB
with identical key to the same operator. The stateful RichCoFlatMapFunction
will set the ValueState
for the key of the current element, i.e., if flatMap1(a: TypeA, out: Collector[TypeOut])
is called for a value from streamA
, the state is set for the key a.foo
and if flatMap2(b: TypeB, out: Collector[TypeOut])
is called for a value from streamB
, the state is set for the key b.bar
.
Upvotes: 4