Reputation: 47
Working with Kafka Streams by creating a GlobalKTable I know per definition that the table will be fully populated before the streaming of other sources will start.
I'm looking for a similar functionality in Apache Flink. Topic one holds configuration data which is almost static. I want Flink to fully consume this topic before even starting to read from topic two. Topic one contains ~5 Mio records with a total size of around 600MB
Is there a way to achieve this or would I need to buffer the data from topic two until I have matching data from topic one?
Upvotes: 0
Views: 141
Reputation: 47
As described in another thread (Provide initial state to Apache Flink Application) - the situation was fixed using a separate init deployment which consumes the topic and writes the data to the Flink state.
Then a savepoint is created before starting the proper application from this savepoint with the data
Upvotes: 0
Reputation: 9255
If you use the Hybrid Source approach, then you won't get updates for the "almost static" configuration data. The same thing is true if you load that topic's data into state using the State Processor API.
Another option that I've used in the past is to support a --coldstart
parameter. When this is set, you configure your workflow with a fake (empty, never terminates) source instead of the second Kafka topic. Then you run the workflow until you see no more new data being loaded from the first topic, stop with a savepoint, and restart from the savepoint but this time without the --coldstart
parameter.
Upvotes: 2
Reputation: 76597
There are a few avenues that you could explore to tackle this:
Flink exposes a State Processor API which allows you to run a batch process that would allow you to seed the state used in a job. Basically reading your entire topic, storing it in state associated with an operator, and then running your actual job using the state from the batch process (so that when it restored, it would already have all of that data in state and ready for use).
This is the approach that I've used most frequently.
I believe that this was the problem that Hybrid Sources were designed to solve by allowing you to sequentially execute certain parts of your pipeline. I'd imagine in theory you could use this to load your existing state and then pivot to reading from elsewhere/not at all.
Depending on the size of your state (although ~5M records might not be trivial), you can sometimes issue an out of band call, which in this case would read through all of your records and pass those into a specific operator in your job graph. Again - probably not a great approach, but it is feasible especially when dealing with smaller sources.
Upvotes: 1