thakur babban
thakur babban

Reputation: 33

Groupby existing attribute present in json string line in apache beam java

I am reading json files from GCS and I have to load data into different BigQuery tables. These file may have multiple records for same customer with different timestamp. I have to pick latest among them for each customer. I am planning to achieve as below

  1. Read files
  2. Group by customer id
  3. Apply DoFn to compare timestamp of records in each group and have only latest one from them
  4. Flat it, convert to table row insert into BQ.

But I am unable to proceed with step 1. I see GroupByKey.create() but unable to make it use customer id as key.

I am implementing using JAVA. Any suggestions would be of great help. Thank you.

Upvotes: 0

Views: 586

Answers (1)

Pablo
Pablo

Reputation: 11031

Before you GroupByKey you need to have your dataset in key-value pairs. It would be good if you had shown some of your code, but without knowing much, you'd do the following:

PCollection<JsonObject> objects = p.apply(FileIO.read(....)).apply(FormatData...)

// Once we have the data in JsonObjects, we key by customer ID:
PCollection<KV<String, Iterable<JsonObject>>> groupedData = 
     objects.apply(MapElements.via(elm -> KV.of(elm.getString("customerId"), elm)))
            .apply(GroupByKey.create())

Once that's done, you can check timestamps and discard all bot the most recent as you were thinking.

Note that you will need to set coders, etc - if you get stuck with that we can iterate.

As a hint / tip, you can consider this example of a Json Coder.

Upvotes: 1

Related Questions