Daniel Schierbeck
Daniel Schierbeck

Reputation: 1952

Handling default values in window aggregations

I have an aggregation that looks at a sliding 30-day window (1 day period) of customer purchases, keyed by customer id, with the value being the purchase amount. I sum up the values by key, thus getting the aggregate purchase amount for each customer during the last 30 days. I store this number in a customer record in an external database.

My question is this: if a customer hasn't purchased anything in the last 30 days, how do I automatically reset the customer record to a default value, in this case zero? I'd prefer to keep all my logic in Dataflow and avoid doing too much work, since this will need to scale quite a bit. I'm basically looking for a way to automatically get a key-value for each key that was not in the current window but was in the last, and the value being a potentially configurable default.

Upvotes: 1

Views: 65

Answers (2)

jkff
jkff

Reputation: 17913

Is it an option to slightly amend the database schema? I suppose now you have something like

(customer_id int, purchases_last_month int)`

Instead how about

`(customer_id int, last_purchase datetime, purchases_last_month int)`

where this time last_purchase is the time of the last purchase made by this customer, and purchases_last_month refers to purchases made in the month before the last one? Then in your DoFn that writes to the database, you'd be making a conditional update (merge/upsert) that updates both last_purchase and purchases_last_month with the values from the current window, but only if last_purchase is increasing. This way you can deal with windows being processed out-of-order or in parallel, at the cost of slight increase in complexity in client queries (which you can address by adding a view on top of the table).

Upvotes: 0

Daniel Schierbeck
Daniel Schierbeck

Reputation: 1952

Trying to answer my own question, but hoping for feedback as to whether this solution would scale:

I've thought about having a step after the initial window-and-sum. This transform would receive (customerId, purchaseSum) elements once a day, as the result of the 30-day window sum is made available. Since these elements are timestamped (with the timestamp of the most recent input element, I believe) I can re-window them. If I create a two-day window with a one-day period, I would then be able to group by key and process (customerId, [purchaseSumA, purchaseSumB]) for customers that had a purchase both in the last 30 days and in the last 31 days. In this case, I emit purchaseSumB. However, if there's only in element in the list, and the timestamp indicates that the purchase was made 31 days ago, I can assume that there were no purchases from the customer since, and I need to emit (customerId, 0). Does that make sense?

Upvotes: 1

Related Questions