fafl
fafl

Reputation: 7385

Batch elements in Apache Flink

I have a stream of IDs in Apache Flink. I would like to batch them into sets of 500 and for each batch call an external service that will give me additional data for each ID. Then I want to forward each ID with the additional data further downstream. I'm using batching here for performance reasons because 1 request with 500 IDs is much faster than 500 requests with 1 ID.

I tried implementing this using windows, but I'm either getting tiny batches or no batches at all. In runtime execution mode BATCH I'm also losing the last remaining IDs.

Ideally I would like to:

I'm a bit lost with the DataSet API, which functions should I use and how can I structure the program?

Upvotes: 0

Views: 19

Answers (1)

kkrugler
kkrugler

Reputation: 9255

With the (recommended) DataStream API, and the goal of having a scalable, reliable workflow, one approach is the following:

  1. In a map function, convert your incoming record to a Tuple2<key, record>. The key would be an Integer hash calculated from one or more stable fields in the incoming record. By "stable" I mean they wouldn't change if you re-ran the workflow on the same data, thus it wouldn't be (say) a field where you put the ingest time.
  2. Key the stream by the Tuple2.f0 (first field).
  3. Implement a KeyedProcessFunction. This would save incoming records in ListState (and also register a timer set to MAX_WATERMARK). When you had 500 records, or the timer fired (which would happen when all of the incoming data had been received), then you'd output a record containing the batch of incoming records.
  4. Follow that by a RichAsyncFunction, where you call the remote service with the batch of records, and use the response to enrich (and then emit) the records.

Upvotes: 0

Related Questions