Daisy
Daisy

Reputation: 61

flink - how to use state as cache

I want to read history from state. if state is null, then read hbase and update the state and using onTimer to set state ttl. The problem is how to batch read hbase, because read single record from hbase is not efficient.

Upvotes: 0

Views: 1842

Answers (1)

David Anderson
David Anderson

Reputation: 43499

In general, if you want to cache/mirror state from an external database in Flink, the most performant approach is to stream the database mutations into Flink -- in other words, turn Flink into a replication endpoint for the database's change data capture (CDC) stream, if the database supports that.

I have no experience with hbase, but https://github.com/mravi/hbase-connect-kafka is an example of something that might work (by putting kafka in-between hbase and flink).

If you would rather query hbase from Flink, and want to avoid making point queries for one user at a time, then you could build something like this:

              -> queryManyUsers -> keyBy(uId) -> 
streamToEnrich                                 CoProcessFunction
              -> keyBy(uID) ------------------->

Here you would split your stream, sending one copy through something like a window or process function or async i/o to query hbase in batches, and send the results into a CoProcessFunction that holds the cache and does the enrichment.

When records arrive in this CoProcessFunction directly, along the bottom path, if the necessary data is in the cache, then it is used. Otherwise the record is buffered, pending the arrival of data for the cache from the upper path.

Upvotes: 1

Related Questions