Vinod.T.V
Vinod.T.V

Reputation: 33

Passing two streams to do operations with MainStreams in Flink Job

In Flink-Job Currently, I have two streams, one main data Streams updated every minute from Kafka topic, Another Stream(Broadcast stream) which is used in the process element function of KeyedBroadcastProcessFunction for some calculations with the mainstream data.

Now I have a new requirement to add one more stream which is totally different in structure from the other two streams.

1) How can I pass the third stream which must be available in Flink state to do calculations along with main data and broadcast state data? in keyedBroadcastProcess function?

2) Can we have two broadcast streams for the main data?

3) Joining will not work as the stream data are totally different data, Broadcast, and third data streams are not changing more often. Its something like a master data which is used in the calculations along with Main Data Stream Couldn't find any solutions yet please help. Please share some links I can refer to.

Upvotes: 0

Views: 807

Answers (1)

David Anderson
David Anderson

Reputation: 43499

Flink does not offer any sort of process function with three inputs.

You could union the two broadcast streams together (before broadcasting them). I appreciate that they are very different types, but you can always find some way to make them co-exist. You can use Either for this if there isn't a more natural way to unify the two types. To union two disparate types into a single stream, you can do something like this:

DataStream<String> strings = env.fromElements("one", "two", "three");
DataStream<Integer> ints = env.fromElements(1, 2, 3);

DataStream<Either<String, Integer>> stringsOnTheLeft = strings
        .map(new MapFunction<String, Either<String, Integer>>() {
            @Override
            public Either<String, Integer> map(String s) throws Exception {
                return Either.Left(s);
            }
        });

DataStream<Either<String, Integer>> intsOnTheRight = ints
        .map(new MapFunction<Integer, Either<String, Integer>>() {
            @Override
            public Either<String, Integer> map(Integer i) throws Exception {
                return Either.Right(i);
            }
        });

DataStream<Either<String, Integer>> stringsAndInts = stringsOnTheLeft.union(intsOnTheRight);

Or if you can apply the broadcast streams to the main stream in separate stages, then you could have a sequence of two KeyedBroadcastProcessFunctions, with the output of one feeding into the other:

events
    .keyBy(x -> x.foo)
    .connect(broadcast1)
    .process(new process1())
    .keyBy(x -> x.foo)
    .connect(broadcast2)
    .process(new process2())

Update:

If we merge like this and broadcast, if any update comes to anyone stream will update the broadcast state or it will create a new entry in the broadcast state?

That's entirely under your control. Broadcast state is always map state; I imagine you'd choose some sort of straightforward key to work with, so you'd have something like MapState<String, Either<T1, T2>>. Map state works like any hashmap: if you reuse a key it will replace the entry, if you introduce a new key, it will create a new entry.

... how can [I] provide a key common to these to [broadcast] streams?

The keys don't have to be the same, they just have to be of the same type.

Upvotes: 1

Related Questions