Payam Mohammadi
Payam Mohammadi

Reputation: 136

Apache beam processing sequential elements

I have a stream of GPS coordinates from different devices. Those coordinates could arrive out of order but they all have an event time. I want to calculate the total distance from GPS coordinates.

From Beam Programming Guide:

The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key

Input:

driver_id,x,y,event_time,processing_time
1,1,1,100001,20001
1,5,5,100004,20002
1,4,5,100003,20003

Expected output:

driver_id, distance, event_time
1,3,100001
1,1,100003

Since the order is important I cannot use anything like CombineFn.

So my question is:

  1. How can I calculate distance using Apache beam with respect to the late incoming of events?
  2. What can I do about successive coordinates that sit in two different windows?
  3. Are there any other alternative stream processing frameworks that can solve my problem?
  4. Events may deliver two days after event time, but I want to have a approximate results before making sure that they all delivered.

Upvotes: 0

Views: 280

Answers (2)

Pablo
Pablo

Reputation: 11041

You can get lists of events like this:

lists_of_events = (p
  | ReadFromPubSub()
  | Map(lambda x: (x['driver'], x))
  | WindowInto(Sessions(SESSION_GAP), 
               allowed_lateness=ALLOWED_LATENESS,
               accumulation_mode=ACCUMULATING)
  | GroupByKey())

This will return a PCollection with tuples with the driver ID and a list of events for each window (type: Tuple[str, Iterable[Dict]]).

You would then write a function to consume that:

def calculate_distance(elm):
  driver, events = elm
  events = sorted(events, key=lambda x: x['timestamp'])
  # Now calculate distances
  distance, ev_time = ....
  yield {'driver': driver, 'distance': distance, 'event_time': ev_time}

lists_of_events | Map(calculate_distance)

Upvotes: 0

Eric Schmidt
Eric Schmidt

Reputation: 1327

I would suggest you spend some time absorbing how to handle late arrive data within in Windows.

You could create a fixed window and then allow late data; however, you would also have to define how you want to handle interim accumulation.

Depending on your key structure, perhaps you could key on driver and then provide a more specific heuristic for determining completeness.

Step 1 - learn about Windows (various types) and how to deal with late arriving data. Note: You will need to tag your incoming elements with their specific clock time and not use the default receive time.

Upvotes: 0

Related Questions