Sudharsan
Sudharsan

Reputation: 29

Iterate Keys with Values for Beam pipeline

After applying .apply(GroupByKey.create()) I am getting values like PCollection<KV<Integer,Iterable>. Can you suggest how to apply further transforms for each key.

Ex: PCollection<KV<1,Iterable> PCollection<KV<2,Iterable> The keys are dynamic values. I need to iterate for each Key Present in the PCollection.

Upvotes: 1

Views: 770

Answers (1)

Bruno Volpato
Bruno Volpato

Reputation: 1428

You should be able to use a DoFn / ParDo to iterate over such iterable.

I drafted a quick example to show how this can be done.

    // Create sample rows
    PCollection<TableRow> rows =
        pipeline
            .apply(
                Create.of(
                    new TableRow().set("group", 1).set("name", "Dataflow"),
                    new TableRow().set("group", 1).set("name", "Pub/Sub"),
                    new TableRow().set("group", 2).set("name", "BigQuery"),
                    new TableRow().set("group", 2).set("name", "Vertex")))
            .setCoder(TableRowJsonCoder.of());

    // Convert into a KV of <group, name>
    PCollection<KV<Integer, String>> keyValues =
        rows.apply(
            "Key",
            MapElements.into(
                    TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
                .via(row -> KV.of((Integer) row.get("group"), (String) row.get("name"))));

    // Group by key
    PCollection<KV<Integer, Iterable<String>>> groups =
        keyValues.apply("Group", GroupByKey.create());

    // Iterate and print group + values
    groups.apply(
        ParDo.of(
            new DoFn<KV<Integer, Iterable<String>>, Void>() {
              @ProcessElement
              public void processElement(@Element KV<Integer, Iterable<String>> kv) {
                StringBuilder sb = new StringBuilder();
                for (String name : kv.getValue()) {
                  if (sb.length() > 0) {
                    sb.append(", ");
                  }
                  sb.append(name);
                }
                System.out.println("Group " + kv.getKey() + " values: " + sb);
              }
            }));

    pipeline.run();

Prints (note that the output is not ordered/guaranteed due to concurrency).

Group 2 values: BigQuery, Vertex
Group 1 values: Dataflow, Pub/Sub

Upvotes: 3

Related Questions