AlpsToronto
AlpsToronto

Reputation: 35

Convert a Beam PCollection to HashMap after Combine.globally()

I want to convert a small PCollection into a List and construct a HashMap. I managed to Combine the elements of the PCollection globally into an Iterable, but the Iterable is inside a PCollection. How do I extract the Iterable (which now contains all my data) from the PCollection.

Here is my Records class

public static class Record {
        String label;
        Float score;
    }

Here is my getHMap method.

public static HashMap<String, Float> getHMap(Pipeline pipeline, String scoreFile) {
        HashMap<String, Float> hmap = new HashMap<String, Float>();
        List<Record> recordsList = new ArrayList<>();
        PCollection<Record> coll = read(pipeline, scoreFile);
        PCollection<Iterable<Record>> recordsListPColl = coll.apply("GetInterable", Combine.globally(new ToList<>()));

//To-Do: extract the Iterable<Record> from the PCollection and move it to recordsList 
for (Record rec : recordsList) {
                hmap.put(rec.label, rec.score);
            }
            return hmap;
        }

For Additional reference, here is my ToList Combine function

public class ToList<T> extends Combine.CombineFn<T, List<T>, Iterable<T>> {
    @Override
    public List<T> createAccumulator() {
        return new ArrayList<>();
    }

    @Override
    public List<T> addInput(List<T> accumulator, T input) {
        accumulator.add(input);
        return accumulator;
    }

    @Override
    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
        Iterator<List<T>> iter = accumulators.iterator();
        if (!iter.hasNext()) {
            return createAccumulator();
        }
        List<T> res = iter.next();
        while (iter.hasNext()) {
            res.addAll(iter.next());
        }
        return res;
    }

    @Override
    public Iterable<T> extractOutput(List<T> accumulator) {
        return accumulator;
    }
}

Upvotes: 1

Views: 361

Answers (0)

Related Questions