chikien276
chikien276

Reputation: 81

Split single PCollection into multiple (dynamic number of) PCollections then do calculating on each collection

I have a unbounded collection which is read from PubsubIO with data, called Trade format like

{
  timestamp: 123,
  type: "",
  side: "" // sell or buy
  volume: 123.12,
  location: ""
}

There are hundreds of types and above 40 number of locations and their relation are n <=> n.

My task are calculating the total of volume of trades in 10 mins and 60 mins category by side, type and location, also calculate total volume base on type. So, the output should be 4 collections of something, each for 10 mins and 60 min and for both sell and buy, called TotalTrade ,like

{
  total: 123,
  type: "",
  location: "",
}

What have I tried so far is.

  1. Branch the collection into 2 collection base on what side of trade is

for each collection I process

  1. Window the collection into fixed windows for 10 mins
  2. ParDo into KV of type Trade
  3. GroupByKey so we have collection of KV<String, Iterable<Trade>>
  4. Apply a custom ParDo calculates the total of volume for each location in Iterable<Trade> so the output is KV<String, Iterable<KV<String, TotalTrade>>>
  5. ...

The problem is in the custom Pardo step. I have to manual group Trades by location, calculate the total then output the result. Which is, for me, is not embracing the parallel model of Apache Beam or Google Dataflow.

So my question is Is there any way to branch a collection into dynamic number collections in Beam model. For example, my problem could be solve by the following transforms.

  1. Transform the collection in to collections base on type of Trade
  2. Transform each of these collections into collections base on location
  3. Do Combine transform to calculate TotalTrade

So now we have TotalTrade category by location and type

  1. Do Flatten transform on each set of collection from step 4.
  2. Do Combine on each collection

So now we have total volume base on type

Upvotes: 0

Views: 2049

Answers (1)

Lara Schmidt
Lara Schmidt

Reputation: 309

It's not possible to branch a collection into a dynamic number of collections if the dynamic number is not available during pipeline creation. The graph / steps are set at the beginning of a pipeline and cannot change.

If you had a lot of dynamic numbers you can try putting out results with and grouping by id. However you would get some hot keys (all of id would have to be processed by 1 worker) if you do not have many ids and do have a lot of values.

Upvotes: 1

Related Questions