Jeyhun Karimov
Jeyhun Karimov

Reputation: 1295

Hadoop Mapreduce count distinct vector elements for big data

I have data consisting of n-length vector of integer/real numbers. Data is typically in GB level and feature size of a vector is more than 100. I want to count distinct elements of every vector feature. For example if I have data like:

1.13211 22.33 1.00 ... 311.66
1.13211 44.44 4.52 ... 311.66
1.55555 22.33 5.11 ... 311.66

I want the result like (2,2,3,...,1) just one vector. Since there is 2 distinct value in first feature of a vector, 2 distinct value in second feature and etc.

The way I think to do it with mapreduce is , to send the values from mapper ("$val$+{feature_vector_num}",1). For example like (1.13211+1,1) or (2.33+2,1). And in reducer just sum them up and probably the second mapper and reducer to wrap up the all reducer results from previous step.

The problem is that, if I have data of size N, with my solution, its size sent to reducer will be |V| * N in worst case,(|V| is the length of feature vector) and this is also the number of reducers and number of keys at the same time. Therefore for big data, this is quite a bad solution.

Do you have any suggessions? Thanks

Upvotes: 1

Views: 3756

Answers (2)

yurgis
yurgis

Reputation: 4067

I would agree with lejlot is that 1GB would be much more optimally solvable using other means (e.g. memory algorithms such as hash map) and not with m/r.

However in case if your problem is 2-3+ orders of magnitude larger, or if you just want to practice with m/r, here is one of the possible solutions:

Phase 1

Mapper

Params:

  • Input key: irrelevant (for TextInputFormat I think it is LongWritable that represents a position in a file but you can just use Writable)
  • Input value: a single line with vector components separated by space (1.13211 22.33 1.00 ... 311.66)
  • Output key: a pair <IntWritable, DoubleWritable> where IntWritable holds an index of the component, and DoubleWritable holds a value of the component. Google for hadoop examples, specifically, SecondarySort.java which demonstrates how to implement a pair of IntWritable. You just need to rewrite this using DoubleWritable as a second component.
  • Output value: irrelevant, you can use NullWritable

Mapper Function

  • Tokenize the value
  • For each token, emit <IntWritable, DoubleWritable> key (you can create a custom writable pair class for that) and NullWritable value

Reducer

The framework will call your reducer with <IntWritable, DoubleWritable> pair as keys, only one time for each key variation, effectively making dedupe. For example, <1, 1.13211> key will come only once.

Params

  • Input Key: Pair <IntWritable, DoubleWritable>
  • Input Value: Irrelevant (Writable or NullWritable)
  • Output Key: IntWritable (component index)
  • Output Value: IntWritable (count corresponding to the index)

Reducer Setup

  • initialize int[] counters array of size equal to your vector dimension.

Reducer Function

  • get an index from key.getFirst()
  • increment count for the index: counters[index]++

Reducer Cleanup

  • for each count in counters array emit, index of the array as a key, and value of the counter.

Phase 2

This one is trivial and only needed if you have multiple reducers in the first phase. In this case the counts calculated above are partial. You need to combine the outputs of your multiple reducers into a single output. You need to set up a single-reducer job, where your reducer will just accumulate counts for corresponding indices.

Mapper

NO-OP

Reducer

Params

  • Input key: IntWritable (position)
  • Input value: IntWritable (partial count)
  • Output key: IntWritable (position)
  • Output value: IntWritable (total count)

Reducer Function

  • for each input key
    • int counter = 0
    • iterate over the values
      • counter += value
    • emit input key (as a key) and counter (as a value)

The resulting output file "part-r-00000" should have N records, where each record is a pair of values (position and distinct count) sorted by position.

Upvotes: 1

bendaizer
bendaizer

Reputation: 1235

Without considering any implementation detail (MapReduce or not), I would do it in 2 steps with a hashtable per feature (probably in Redis).

The first step would list all values and corresponding counts.

The second would then run through each vector and see if the element is unique or not in the hastable. If you have some margin for error, and want a light memory footprint, I would even go with a bloom filter.

The two steps are trivially parallelized.

Upvotes: 2

Related Questions