Reputation: 177
i tried to use broadcast state pattern for my flink app, but after some researching i did following:
case class MyData(field1: String, field2: String, ts: Long, type: String) // type can be DATA or CONFIG
val stream1: DataStream[MyData] = ... // kinesis queue 1. main Data stream
val stream2: DataStream[MyData] = ... // kinesis queue 2. configuration stream
val union = stream1.union(stream2)
.keyBy(x => s"${x.field1}_${x.field2}")
.process(new MyProcessFun)
In a MyProcessFun()
i read the data, and depending on a data coming from a stream2
i do some logic with the data and emits some elements.
Basically i use a stream2
like a broadcast state pattern. I do not specifically use broadcast because there is no easy way to access some state i have from processBroadcastElement
.
Since my config stream is used as an indicator for cleaning state i have in my MyProcessFun()
.
Streams are .keyBy
so i do not expect problems with parallelism > 1. This is true?
My question is for what cases is Broadcast still needed?
In what cases is it necessary to use the broadcast pattern?
Since for many cases such functionality can be solved with the help of .union()
,.connect() // with no broadcast
+ {Co}ProcessFunction()
.
Upvotes: 1
Views: 779
Reputation: 3864
Two things to say here.
First of all, it seems You could use the standard KeyedCoProcess
function to achieve what You are now doing with union
. It won't really differ much but You can have separate classes for both streams so better type safety and better domain segregation in general.
As for the broadcast
, the main usecase is when the control
stream doesn't have key to keyBy
or simply can't/shouldn't be partitioned.
One example to think of, is that You may have some events generated by external system and You want to apply rules to filter out events that do not fulfill the requirements in the rules. You want to have dynamic rules, so that if user defines a rule it will be immediately used to filter incoming events. For simplicity, let's assume that rules are quite generic applied for all event types (like, if event occured after 5pm given day then it should be filtered or if event lasted more than 5 mins we assume it's invalid). You can't partition such rules, so the solution would be broadcast
.
Or if you wanted to have a system where You would calculate the total earnings for delivery drivers in real-time. You might have set of additional bonuses (like if the driver does 10 deliveries in an hour there is 5% bonus). You wouldn't want to create a separate set of bonus rules for each driver just so You can keyBy
it, woudld You ?:)
Upvotes: 2