Reputation: 7385
I have a stream of IDs in Apache Flink. I would like to batch them into sets of 500 and for each batch call an external service that will give me additional data for each ID. Then I want to forward each ID with the additional data further downstream. I'm using batching here for performance reasons because 1 request with 500 IDs is much faster than 500 requests with 1 ID.
I tried implementing this using windows, but I'm either getting tiny batches or no batches at all. In runtime execution mode BATCH I'm also losing the last remaining IDs.
Ideally I would like to:
I'm a bit lost with the DataSet API, which functions should I use and how can I structure the program?
Upvotes: 0
Views: 19
Reputation: 9255
With the (recommended) DataStream API, and the goal of having a scalable, reliable workflow, one approach is the following:
Tuple2<key, record>
. The key
would be an Integer hash calculated from one or more stable fields in the incoming record. By "stable" I mean they wouldn't change if you re-ran the workflow on the same data, thus it wouldn't be (say) a field where you put the ingest time.Tuple2.f0
(first field).KeyedProcessFunction
. This would save incoming records in ListState
(and also register a timer set to MAX_WATERMARK
). When you had 500 records, or the timer fired (which would happen when all of the incoming data had been received), then you'd output a record containing the batch of incoming records.RichAsyncFunction
, where you call the remote service with the batch of records, and use the response to enrich (and then emit) the records.Upvotes: 0