arthur
arthur

Reputation: 1064

pair-wise aggregation of input lines in Hadoop

A bunch of driving cars produce traces (sequences of ordered positions)

car_id  order_id    position
car1    0       (x0,y0)
car1    1       (x1,y1)
car1    2       (x2,y2)
car2    0       (x0,y0)
car2    1       (x1,y1)
car2    2       (x2,y2)
car2    3       (x3,y3)
car2    4       (x4,y4)
car3    0       (x0,y0)

I would like to compute the distance (path length) driven by the cars.

At the core, I need to process all records line by line pair-wise. If the car_id of the previous line is the same as the current one then I need to compute the distance to the previous position and add it up to the aggregated value. If the car_id of the previous line is different from the current line then I need to output the aggregate for the previous car_id, and initialize the aggregate of the current car_id with zero.

How should the architecture of the hadoop program look like? Is it possible to archieve the following:

Solution (1):

(a) Every mapper computes the aggregated distance of the trace (per physical block)

(b) Every mapper aggregates the distances further in case the trace was split among multiple blocks and nodes

Comment: this solution requires to know whether I am on the last record (line) of the block. Is this information available at all?

Solution (2)

(a) The mappers read the data line by line (do no computations) and send the data to the reducer based on the car_id.

(b) The reducers sort the data for individual car_ids based on order_id, computes the distances, and aggregates them

Comment: high network load due to laziness of mappers

Solution (3)

(a) implement a custom reader to read define a logical record to be the whole trace of one car

(b) each mapper computes the distances and the aggregate

(c) reducer is not really needed as everything is done by the mapper

Comment: high main memory costs as the whole trace needs to be loaded into main memory (although only two lines are used at a time).

Upvotes: 1

Views: 52

Answers (1)

Thomas Jungblut
Thomas Jungblut

Reputation: 20969

I would go with Solution (2), since it is the cleanest to implement and reuse.

You certainly want to sort based on car_id AND order_id, so you can compute the distances on the fly without loading them all up into memory.

Your concern about high network usage is valid, however, you can pre-aggregate the distances in a combiner.

How would that look like, let's take some pseudo-code:

Mapper:

  foreach record:
    emit((car_id, order_id), (x,y))

Combiner:

if(prev_order_id + 1 == order_id): // subsequent measures
     // compute distance and emit that as the last possible order
     emit ((car_id, MAX_VALUE), distance(prev, cur)) 
else:
     // send to the reducer, since it is probably crossing block boundaries
     emit((car_id, order_id), (x,y)) 

The reducer then has two main parts:

  • compute the sum over subsequent measures, like the combiner did
  • sum over all existing sums, tagged with order_id = MAX_VALUE

That's already best-effort what you can get from a network usage POV.

From a software POV, better use Spark- your logic will be five lines instead of 100 across three class files.

For your other question:

this solution requires to know whether I am on the last record (line) of the block. Is this information available at all?

Hadoop only guarantees that it is not splitting through records when reading, it may very well be that your record is already touching two different blocks underneath. The way to find that out is basically to rewrite your input format to make this information available to your mappers, or even better- take your logic into account when splitting blocks.

Upvotes: 2

Related Questions