Harshith Bolar
Harshith Bolar

Reputation: 826

What is the best way to have a cache of an external database in Flink?

The external database consists of a set of rules for each key, these rules should be applied on each stream element in the Flink job. Because it is very expensive to make a DB call for each element and retrieve the rules, I want to fetch the rules from the database at initialization and store it in a local cache.

When rules are updated in the external database, a status change event is published to the Flink job which should be used to fetch the rules and refresh this cache.

What is the best way to achieve what I've described? I looked into keyed state but initializing all keys and refreshing the keys on update doesn't seem possible.

Upvotes: 0

Views: 2277

Answers (3)

snntrable
snntrable

Reputation: 921

In essence David's answer summarizes it well. If you are looking for more detail: not long ago, I gave a webinar [1] on this topic including running code examples. [2]

[1] https://www.youtube.com/watch?v=cJS18iKLUIY

[2] https://github.com/knaufk/enrichments-with-flink

Upvotes: 1

David Anderson
David Anderson

Reputation: 43499

A few different mechanisms in Flink may be relevant to this use case, depending on your detailed requirements.

Broadcast State

Jaya Ananthram has already covered the idea of using broadcast state in his answer. This makes sense if the rules should be applied globally, for every key, and if you can find a way to collect and broadcast the updates.

Note that the Context in the processBroadcastElement() of a KeyedBroadcastProcessFunction method contains the method applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function). This means you can register a KeyedStateFunction that will be applied to all states of all keys associated with the provided stateDescriptor.

State Processor API

If you want to bootstrap state in a Flink savepoint from a database dump, you can do that with this library. You'll find a simple example of using the State Processor API to bootstrap state in this gist.

Change Data Capture

The Table/SQL API supports Debezium, Canal, and Maxwell CDC streams, and Kafka upsert streams. This may be a solution. There's also flink-cdc-connectors.

Lookup Joins

Flink SQL can do temporal lookup joins against a JDBC database, with a configurable cache. Not sure this is relevant.

Upvotes: 1

Jaya Ananthram
Jaya Ananthram

Reputation: 3463

I think you can make use of BroadcastProcessFunction or KeyedBroadcastProcessFunction to achieve your use case. A detailed blog available here

In short: You can define the source such as Kafka or any other and then publish the rules to Kafka that you want the actual stream to consume. Connect the actual data stream and rules stream. Then the processBroadcastElement will stream the rules where you can update the state. Finally the updated state (rules) can be retrieved in the actual event streaming method processElement.

Points to consider: Broadcast state will be kept on the heap always, not in state store (RocksDB). So, it has to be small enough to fit in memory. Each slot will copy all of the broadcast state into its checkpoints, so all checkpoints and savepoints will have n (parallelism) copies of the broadcast state.

Upvotes: 1

Related Questions