Reputation: 1952
I have an aggregation that looks at a sliding 30-day window (1 day period) of customer purchases, keyed by customer id, with the value being the purchase amount. I sum up the values by key, thus getting the aggregate purchase amount for each customer during the last 30 days. I store this number in a customer record in an external database.
My question is this: if a customer hasn't purchased anything in the last 30 days, how do I automatically reset the customer record to a default value, in this case zero? I'd prefer to keep all my logic in Dataflow and avoid doing too much work, since this will need to scale quite a bit. I'm basically looking for a way to automatically get a key-value for each key that was not in the current window but was in the last, and the value being a potentially configurable default.
Upvotes: 1
Views: 65
Reputation: 17913
Is it an option to slightly amend the database schema? I suppose now you have something like
(customer_id int, purchases_last_month int)`
Instead how about
`(customer_id int, last_purchase datetime, purchases_last_month int)`
where this time last_purchase
is the time of the last purchase made by this customer, and purchases_last_month
refers to purchases made in the month before the last one? Then in your DoFn
that writes to the database, you'd be making a conditional update (merge/upsert) that updates both last_purchase
and purchases_last_month
with the values from the current window, but only if last_purchase
is increasing. This way you can deal with windows being processed out-of-order or in parallel, at the cost of slight increase in complexity in client queries (which you can address by adding a view on top of the table).
Upvotes: 0
Reputation: 1952
Trying to answer my own question, but hoping for feedback as to whether this solution would scale:
I've thought about having a step after the initial window-and-sum. This transform would receive (customerId, purchaseSum)
elements once a day, as the result of the 30-day window sum is made available. Since these elements are timestamped (with the timestamp of the most recent input element, I believe) I can re-window them. If I create a two-day window with a one-day period, I would then be able to group by key and process (customerId, [purchaseSumA, purchaseSumB])
for customers that had a purchase both in the last 30 days and in the last 31 days. In this case, I emit purchaseSumB
. However, if there's only in element in the list, and the timestamp indicates that the purchase was made 31 days ago, I can assume that there were no purchases from the customer since, and I need to emit (customerId, 0)
. Does that make sense?
Upvotes: 1