Reputation: 826
The external database consists of a set of rules for each key, these rules should be applied on each stream element in the Flink job. Because it is very expensive to make a DB call for each element and retrieve the rules, I want to fetch the rules from the database at initialization and store it in a local cache.
When rules are updated in the external database, a status change event is published to the Flink job which should be used to fetch the rules and refresh this cache.
What is the best way to achieve what I've described? I looked into keyed state but initializing all keys and refreshing the keys on update doesn't seem possible.
Upvotes: 0
Views: 2277
Reputation: 921
In essence David's answer summarizes it well. If you are looking for more detail: not long ago, I gave a webinar [1] on this topic including running code examples. [2]
[1] https://www.youtube.com/watch?v=cJS18iKLUIY
[2] https://github.com/knaufk/enrichments-with-flink
Upvotes: 1
Reputation: 43499
A few different mechanisms in Flink may be relevant to this use case, depending on your detailed requirements.
Broadcast State
Jaya Ananthram has already covered the idea of using broadcast state in his answer. This makes sense if the rules should be applied globally, for every key, and if you can find a way to collect and broadcast the updates.
Note that the Context
in the processBroadcastElement()
of a KeyedBroadcastProcessFunction
method contains the method applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function
). This means you can register a KeyedStateFunction
that will be applied to all states of all keys associated with the provided stateDescriptor.
State Processor API
If you want to bootstrap state in a Flink savepoint from a database dump, you can do that with this library. You'll find a simple example of using the State Processor API to bootstrap state in this gist.
Change Data Capture
The Table/SQL API supports Debezium, Canal, and Maxwell CDC streams, and Kafka upsert streams. This may be a solution. There's also flink-cdc-connectors.
Lookup Joins
Flink SQL can do temporal lookup joins against a JDBC database, with a configurable cache. Not sure this is relevant.
Upvotes: 1
Reputation: 3463
I think you can make use of BroadcastProcessFunction or KeyedBroadcastProcessFunction to achieve your use case. A detailed blog available here
In short: You can define the source such as Kafka or any other and then publish the rules to Kafka that you want the actual stream to consume. Connect the actual data stream and rules stream. Then the processBroadcastElement
will stream the rules where you can update the state. Finally the updated state (rules) can be retrieved in the actual event streaming method processElement
.
Points to consider: Broadcast state will be kept on the heap always, not in state store (RocksDB). So, it has to be small enough to fit in memory. Each slot will copy all of the broadcast state into its checkpoints, so all checkpoints and savepoints will have n (parallelism) copies of the broadcast state.
Upvotes: 1