frodo
frodo

Reputation: 1571

Flink time buffered sink

I'm trying to write a Flink application which reads events from Kafka, enriches these events from MySQL and writes this data to HBase. I'm doing the MySQL enrichment within a RichFlatMapFunction and I'm now trying to figure out how best to write to HBase. I want to batch writes to HBase, so I'm currently thinking of using a WindowAll, followed by a identity apply (only to get back to a DataStream) and then write a HBaseSink which takes a list of records and does batch Puts to HBase.

Is this the right way to be doing things? It feels weird to be using the WindowAll and apply just to be doing time based buffering.

Upvotes: 0

Views: 1034

Answers (2)

kkrugler
kkrugler

Reputation: 9280

From what I see on Jira with FLINK-2055, I think your best option currently is to use Flink Streaming's support for Async I/O, and handle buffering inside of that custom function. It appears that the tricky bits come from (a) properly handling checkpoints/retries (atomic puts) and (b) avoiding overloading HBase region servers. But if you aren't worried about exactly-once support, and can tune settings to your HBase setup, then this should be pretty straightforward.

Upvotes: 1

gifa
gifa

Reputation: 86

The heaviest operation when dealing with HBase is actually to open a connection to it (as also explained in the documentation: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html).

What I would suggest is to implement another RichFlatMapFunction such that you are able to open one connection only once in each Task and then to perform a Put to HBase every time an event is required to be persisted. So far the difference with your current approach will be that:

  • you don't require to use a WindowAll
  • you don't open a new connection to HBase every time in the HBaseSink

Regarding the fact of executing the Put to HBase from a batch or from a streaming perspective, doesn't actually change the complexity of the operations (as you can also see from the first answer to this question: Is HBase batch put put(List<Put>) faster than put(Put)? What is the capacity of a Put object? ) so far you will be doing still N operations to persist N events.

Upvotes: 0

Related Questions