Reputation: 35
I want to convert a small PCollection into a List and construct a HashMap. I managed to Combine the elements of the PCollection globally into an Iterable, but the Iterable is inside a PCollection. How do I extract the Iterable (which now contains all my data) from the PCollection.
Here is my Records class
public static class Record {
String label;
Float score;
}
Here is my getHMap method.
public static HashMap<String, Float> getHMap(Pipeline pipeline, String scoreFile) {
HashMap<String, Float> hmap = new HashMap<String, Float>();
List<Record> recordsList = new ArrayList<>();
PCollection<Record> coll = read(pipeline, scoreFile);
PCollection<Iterable<Record>> recordsListPColl = coll.apply("GetInterable", Combine.globally(new ToList<>()));
//To-Do: extract the Iterable<Record> from the PCollection and move it to recordsList
for (Record rec : recordsList) {
hmap.put(rec.label, rec.score);
}
return hmap;
}
For Additional reference, here is my ToList Combine function
public class ToList<T> extends Combine.CombineFn<T, List<T>, Iterable<T>> {
@Override
public List<T> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<T> addInput(List<T> accumulator, T input) {
accumulator.add(input);
return accumulator;
}
@Override
public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
Iterator<List<T>> iter = accumulators.iterator();
if (!iter.hasNext()) {
return createAccumulator();
}
List<T> res = iter.next();
while (iter.hasNext()) {
res.addAll(iter.next());
}
return res;
}
@Override
public Iterable<T> extractOutput(List<T> accumulator) {
return accumulator;
}
}
Upvotes: 1
Views: 361