Reputation: 29
After applying .apply(GroupByKey.create()) I am getting values like PCollection<KV<Integer,Iterable>. Can you suggest how to apply further transforms for each key.
Ex: PCollection<KV<1,Iterable> PCollection<KV<2,Iterable> The keys are dynamic values. I need to iterate for each Key Present in the PCollection.
Upvotes: 1
Views: 770
Reputation: 1428
You should be able to use a DoFn / ParDo to iterate over such iterable.
I drafted a quick example to show how this can be done.
// Create sample rows
PCollection<TableRow> rows =
pipeline
.apply(
Create.of(
new TableRow().set("group", 1).set("name", "Dataflow"),
new TableRow().set("group", 1).set("name", "Pub/Sub"),
new TableRow().set("group", 2).set("name", "BigQuery"),
new TableRow().set("group", 2).set("name", "Vertex")))
.setCoder(TableRowJsonCoder.of());
// Convert into a KV of <group, name>
PCollection<KV<Integer, String>> keyValues =
rows.apply(
"Key",
MapElements.into(
TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via(row -> KV.of((Integer) row.get("group"), (String) row.get("name"))));
// Group by key
PCollection<KV<Integer, Iterable<String>>> groups =
keyValues.apply("Group", GroupByKey.create());
// Iterate and print group + values
groups.apply(
ParDo.of(
new DoFn<KV<Integer, Iterable<String>>, Void>() {
@ProcessElement
public void processElement(@Element KV<Integer, Iterable<String>> kv) {
StringBuilder sb = new StringBuilder();
for (String name : kv.getValue()) {
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(name);
}
System.out.println("Group " + kv.getKey() + " values: " + sb);
}
}));
pipeline.run();
Prints (note that the output is not ordered/guaranteed due to concurrency).
Group 2 values: BigQuery, Vertex
Group 1 values: Dataflow, Pub/Sub
Upvotes: 3