Reputation: 1925
I'm trying to convert the first value into the second:
PCollection<KV<Integer, List<T>>>
PCollection<KV<String, List<List<T>>>>
Input values were already grouped by key.
Output values should be batches of T-s, whose total length would be up to certain size.
Output key would be some new key, for example "<firstInputKeyInBatch>-<totalBatchSize>"
.
The main goal is to save variable-length sequences to files of roughly the same size, while not splitting a sequence between different files.
Note that built-in GroupIntoBatches
operates on ungrouped PCollections, and kind of splits the values under key, while I need to batch them. Its documentation says Batches will contain only elements of a single key
, but how to make batches to be composed of multiple keys?
I tried other answers [1], [2], but the problem I'm facing is that @FinishBundle
method is called per each original Integer key, so my batches end up being effectively the same as input. E.g. all output keys end with "-1"
.
private List<List<T>> batch = new ArrayList<>();
private Integer lastKey = null;
private Integer numKeys = 0;
public void processElement(KV<Integer, List<T>> input, ...) {
batch.add(input.getValue());
lastKey = input.getKey();
numKeys++;
if (batch.size() > 10) {
// ... flush the batch ...
}
}
String generateBatchId() {
return lastKey + "-" + numKeys;
}
@FinishBundle
public void finishBundle(FinishBundleContext context) {
Instant timestamp = GlobalWindow.INSTANCE.maxTimestamp();
context.output(KV.of(generateBatchId(), batch), timestamp, GlobalWindow.INSTANCE);
batch = new ArrayList<>();
}
[1] Can datastore input in google dataflow pipeline be processed in a batch of N entries at a time?
[2] Partition data coming from CSV so I can process larger patches rather then individual lines
I also tried using stateful processing, like in the GroupIntoBatches
implementation, but the state variables are also local to an input key. Also, its implementation doesn't handle batch processing (the batch never gets outputed).
Upvotes: 2
Views: 1028
Reputation: 1925
Answering my own question for the record.
This problem is theoretically equivalent to cumulative sum problem (which I believe is not possible in big data frameworks, at least not in one step).
Basically if I could associate with each key not just its own value, but the cumulative sum of values in the collection, then I could trivially solve the grouping task by dividing the cumsum value by the desired batch size, and group using that.
The types would look like:
PCollection<KV<Integer, List<T>>>
// map: value to value.size()
PCollection<KV<Integer, Integer>>
// take cumsum of values -- not possible
PCollection<KV<Integer, Long>>
// map: divide value by desired batch size using integer division
PCollection<KV<Integer, Long>>
// now value is the group id where the key should go
Upvotes: 0