Shahriar.M
Shahriar.M

Reputation: 898

TypeError: cannot unpack non-iterable float object - MapReduce - mrjob

I'm testing a simple example to learn about MapReduce and mrjob.

The goal is to sum up the logarithm of all the numbers and divide the count of all numbers by this summation.

The code is pretty easy and straightforward:

# mrMedian.py

from mrjob.job import MRJob
from mrjob.step import MRStep

import math

class MrMedian(MRJob):

    def __init__(self, *args, **kwargs):
        super(MrMedian, self).__init__(*args, **kwargs)
        self.inCount = 0
        self.inLogSum = 0.0

    #increment the count of elements and add the 
    # logarithm of the current number to the summation
    def map(self, key, val):
        inVal = float(val)
        self.inCount += 1
        self.inLogSum += math.log(inVal)

    # return the count and summation after all numbers are processed     
    def map_final(self):
        yield (1, [self.inCount, self.inLogSum])

    # aggregate the count and summation values and yield the result
    def reduce(self, key, packedValues):
        cumLogSum=1.0
        cumN=0

        for valArr in packedValues:
            nj = int(valArr[0])
            cumN += nj        
            cumLogSum += float(valArr[1])
        
        median = cumN/cumLogSum
        yield (median)

    # define mapper and reducer
    def steps(self):
        return ([
            MRStep(mapper=self.map, reducer=self.reduce, mapper_final=self.map_final)
        ])
    
# to run:
# python MrMedian.py < inputFile.txt     
if __name__ == '__main__':
    MrMedian.run()

In the map_final method I'm yielding (1, [self.inCount, self.inLogSum]). the value 1 is the key which is ignored and the list [self.inCount, self.inLogSum] is the value that in the reduce method we should treat with it (packedValues) as an iterable and somehow iterate through it using a for loop.

I am getting this error:

(venv) shahriar@Lenovo:/media/shahriar/01D779182B58B9D0$ python mrMedian.py < inputFile.txt > outFile.txt No configs found; falling back on auto-configuration No configs specified for inline runner Creating temp directory /tmp/mrMedian.shahriar.20221113.152412.029427 Running step 1 of 1... reading from STDIN

Error while reading from /tmp/mrMedian.shahriar.20221113.152412.029427/step/000/reducer/00000/input:

Traceback (most recent call last):   
    File "/media/shahriar/01D779182B58B9D0/assignment2/mrMedian.py", line 43, in <module>
    MrMedian.run()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 616, in run
    cls().execute()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 687, in execute
    self.run_job()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 636, in run_job
    runner.run()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/runner.py", line 503, in run
    self._run()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 161, in _run
    self._run_step(step, step_num)   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 170, in _run_step
    self._run_streaming_step(step, step_num)   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 187, in _run_streaming_step
    self._run_reducers(step_num, num_reducer_tasks)   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 289, in _run_reducers
    self._run_multiple(   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 130, in _run_multiple
    func()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 746, in _run_task
    invoke_task(   File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/inline.py", line 133, in invoke_task
    task.execute()   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 681, in execute
    self.run_reducer(self.options.step_num)   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 795, in run_reducer
    for k, v in self.reduce_pairs(read_lines(), step_num=step_num):   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 866, in reduce_pairs
    for k, v in self._combine_or_reduce_pairs(pairs, 'reducer', step_num):   
    File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 889, in _combine_or_reduce_pairs
    for k, v in task(key, values) or (): 
    TypeError: cannot unpack non-iterable float object

The input file which is the result of map_final method is ok:

shahriar@Lenovo-:/tmp/mrMedian.shahriar.20221113.152412.029427/step/000/reducer/00000$ cat input 
1   [13, 78.5753201837955]
1   [13, 77.20894832945609]
1   [12, 75.70546637672973]
1   [12, 73.97942285230064]
1   [13, 78.7642193551817]
1   [13, 74.83203774429285]
1   [13, 72.28868623927899]
1   [11, 67.51370208632588]

I commented the for loop inside the reducer method to check whether the error is because of packedValues but I was getting the error again.

Any idea is appreciated.

Upvotes: 0

Views: 107

Answers (1)

Ray Cai
Ray Cai

Reputation: 1

I've resolved a similar issue by yield (key, value) pair from the reducer function.

def reduce(self, key, packed_values):
    ...
    # must yield k, v here
    yield 1, cumulative_n / cumulative_ln_val

Looking into the source code of job.py, it looks like the "line 889, in _combine_or_reduce_pairs" expects key-value results from the reducer function.

def _combine_or_reduce_pairs(self, pairs, mrc, step_num=0):
    """Helper for :py:meth:`combine_pairs` and :py:meth:`reduce_pairs`."""
    step = self._get_step(step_num, MRStep)

    task = step[mrc]
    task_init = step[mrc + '_init']
    task_final = step[mrc + '_final']
    if task is None:
        raise ValueError('No %s in step %d' % (mrc, step_num))

    if task_init:
        for k, v in task_init() or ():
            yield k, v

    # group all values of the same key together, and pass to the reducer
    #
    # be careful to use generators for everything, to allow for
    # very large groupings of values
    for key, pairs_for_key in itertools.groupby(pairs, lambda k_v: k_v[0]):
        values = (value for _, value in pairs_for_key)
        for k, v in task(key, values) or ():
            yield k, v

Upvotes: 0

Related Questions