Reputation: 136
I have a stream of GPS coordinates from different devices. Those coordinates could arrive out of order but they all have an event time. I want to calculate the total distance from GPS coordinates.
From Beam Programming Guide:
The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key
Input:
driver_id,x,y,event_time,processing_time
1,1,1,100001,20001
1,5,5,100004,20002
1,4,5,100003,20003
Expected output:
driver_id, distance, event_time
1,3,100001
1,1,100003
Since the order is important I cannot use anything like CombineFn
.
So my question is:
Upvotes: 0
Views: 280
Reputation: 11041
You can get lists of events like this:
lists_of_events = (p
| ReadFromPubSub()
| Map(lambda x: (x['driver'], x))
| WindowInto(Sessions(SESSION_GAP),
allowed_lateness=ALLOWED_LATENESS,
accumulation_mode=ACCUMULATING)
| GroupByKey())
This will return a PCollection with tuples with the driver ID and a list of events for each window (type: Tuple[str, Iterable[Dict]]
).
You would then write a function to consume that:
def calculate_distance(elm):
driver, events = elm
events = sorted(events, key=lambda x: x['timestamp'])
# Now calculate distances
distance, ev_time = ....
yield {'driver': driver, 'distance': distance, 'event_time': ev_time}
lists_of_events | Map(calculate_distance)
Upvotes: 0
Reputation: 1327
I would suggest you spend some time absorbing how to handle late arrive data within in Windows.
You could create a fixed window and then allow late data; however, you would also have to define how you want to handle interim accumulation.
Depending on your key structure, perhaps you could key on driver and then provide a more specific heuristic for determining completeness.
Step 1 - learn about Windows (various types) and how to deal with late arriving data. Note: You will need to tag your incoming elements with their specific clock time and not use the default receive time.
Upvotes: 0