Reputation: 23
I'm using Google Cloud Dataflow and have a ParDo function that requires access to all the elements in a PCollection. To accomplish this, I wanted to convert a PCollection<T> into a PCollection<Iterable<T>> containing a single Iterable of all the elements. I was wondering if there's a cleaner/simpler/faster solution to what I have come up with.
The first approach was to create a dummy key, perform a GroupByKey, and get the values afterwards.
PCollection<MyType> myData;
// AddDummyKey() outputs KV.of(1, context.element()) for everything
PCollection<KV<Integer, MyType>> myDataKeyed = myData.apply(ParDo.of(new AddDummyKey()));
// Group by dummy key
PCollection<KV<Integer, Iterable<MyType>>> myDataGrouped = myDataKeyed.apply(GroupByKey.create());
// Extract values
PCollection<Iterable<MyType>> myDataIterable = myDataGrouped.apply(Values.<Iterable<MyType>>create()
The second approach followed the recommendation here: How do I make View's asList() sortable in Google Dataflow SDK? but without the sorting. I created a View.asList(), created a dummy PCollection, and then applied a ParDo function on the dummy PCollection with the view as a side input and simply returned the view.
PCollection<MyType> myData;
// Create view of the PCollection as a list
PCollectionView<List<MyType>> myDataView = myData.apply(View.asList());
// Create dummy PCollection
PCollection<Integer> dummy = pipeline.apply(Create.<Integer>of(1));
// Apply dummy ParDo that returns the view
PCollection<List<MyType>> myDataList = dummy.apply(
ParDo.withSideInputs(myDataView).of(new DoFn<Integer, List<MyType>>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.sideInput(myDataView));
}
}));
It seems like there would be a pre-defined combine function for this task, but I can't find one. Thanks for the help!
Upvotes: 2
Views: 1865
Reputation: 3745
As of now, a more considerable approach would be to use Combine together with an AccumulatorFn, e.g.:
Upvotes: 0
Reputation: 6023
If you know you need the whole thing, then both of your approaches are reasonable. Both have been used in the Dataflow SDK and later when it became the Apache Beam SDK.
DataflowAssert
works, in fact. In Beam, where different backend runners may implement side inputs differently, you should prefer View.asIterable()
since it has fewer assumptions and may allow more streaming of a very large side input.PAssert
works. It accomplishes the same thing, requires a little more care for empty collections, but more Beam runners have good GroupByKey
support than side input support (especially when they are new and still under development).So View.asIterable()
is basically intended to be just what you are asking for. There have also been some requests for a GroupGlobally
transform that does the second version; that could happen at some point.
Upvotes: 1