josealvarez97
josealvarez97

Reputation: 135

How to use aggregateField() over multiple columns in Apache Beam Java SDK?

In Apache Beam Python SDK, it is possible to perform the following:

input
| GroupBy(account=lambda s: s["account"])
.aggregate_field(lambda x: x["wordsAddup"] - x["wordsSubtract"], sum, 'wordsRead')

How do we perform a similar action in the Java SDK? Strangely, the programming guide has only examples in Python for this transform.

Here is my attempt at producing the equivalent in Java:

input.apply(
Group.byFieldNames("account")
.aggregateField(<INSERT EQUIVALENT HERE>, Sum.ofIntegers(), "wordsRead"));

Upvotes: 0

Views: 1231

Answers (1)

robertwb
robertwb

Reputation: 5104

There are some Java examples at https://beam.apache.org/documentation/programming-guide/#using-schemas . (Note you may have to select the java tab on a selector that has both Java and Python to see them.)

In Java I don't think the first argument of aggregateField can take an arbitrary expression; it must be a field name. You can proceed the grouping operation with a projection that adds a new field for the desired expression. For example

input
    .apply(SqlTransform.query(
        "SELECT *, wordsAddup - wordsSubtract AS wordsDiff from PCOLLECTION")
    .apply(Group.byFieldNames("account")
        .aggregateField("wordsDiff", Sum.ofIntegers(), "wordsRead"));

Upvotes: 1

Related Questions