Reputation: 898
I'm testing a simple example to learn about MapReduce and mrjob.
The goal is to sum up the logarithm of all the numbers and divide the count of all numbers by this summation.
The code is pretty easy and straightforward:
# mrMedian.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import math
class MrMedian(MRJob):
def __init__(self, *args, **kwargs):
super(MrMedian, self).__init__(*args, **kwargs)
self.inCount = 0
self.inLogSum = 0.0
#increment the count of elements and add the
# logarithm of the current number to the summation
def map(self, key, val):
inVal = float(val)
self.inCount += 1
self.inLogSum += math.log(inVal)
# return the count and summation after all numbers are processed
def map_final(self):
yield (1, [self.inCount, self.inLogSum])
# aggregate the count and summation values and yield the result
def reduce(self, key, packedValues):
cumLogSum=1.0
cumN=0
for valArr in packedValues:
nj = int(valArr[0])
cumN += nj
cumLogSum += float(valArr[1])
median = cumN/cumLogSum
yield (median)
# define mapper and reducer
def steps(self):
return ([
MRStep(mapper=self.map, reducer=self.reduce, mapper_final=self.map_final)
])
# to run:
# python MrMedian.py < inputFile.txt
if __name__ == '__main__':
MrMedian.run()
In the map_final
method I'm yielding (1, [self.inCount, self.inLogSum])
. the value 1
is the key which is ignored and the list [self.inCount, self.inLogSum]
is the value that in the reduce
method we should treat with it (packedValues
) as an iterable and somehow iterate through it using a for
loop.
I am getting this error:
(venv) shahriar@Lenovo:/media/shahriar/01D779182B58B9D0$ python mrMedian.py < inputFile.txt > outFile.txt No configs found; falling back on auto-configuration No configs specified for inline runner Creating temp directory /tmp/mrMedian.shahriar.20221113.152412.029427 Running step 1 of 1... reading from STDIN
Error while reading from /tmp/mrMedian.shahriar.20221113.152412.029427/step/000/reducer/00000/input:
Traceback (most recent call last):
File "/media/shahriar/01D779182B58B9D0/assignment2/mrMedian.py", line 43, in <module>
MrMedian.run()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 616, in run
cls().execute()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 687, in execute
self.run_job()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 636, in run_job
runner.run()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/runner.py", line 503, in run
self._run()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 161, in _run
self._run_step(step, step_num)
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 170, in _run_step
self._run_streaming_step(step, step_num)
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 187, in _run_streaming_step
self._run_reducers(step_num, num_reducer_tasks)
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 289, in _run_reducers
self._run_multiple(
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 130, in _run_multiple
func()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/sim.py", line 746, in _run_task
invoke_task( File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/inline.py", line 133, in invoke_task
task.execute()
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 681, in execute
self.run_reducer(self.options.step_num)
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 795, in run_reducer
for k, v in self.reduce_pairs(read_lines(), step_num=step_num):
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 866, in reduce_pairs
for k, v in self._combine_or_reduce_pairs(pairs, 'reducer', step_num):
File "/media/shahriar/01D779182B58B9D0/venv/lib/python3.10/site-packages/mrjob/job.py", line 889, in _combine_or_reduce_pairs
for k, v in task(key, values) or ():
TypeError: cannot unpack non-iterable float object
The input file which is the result of map_final
method is ok:
shahriar@Lenovo-:/tmp/mrMedian.shahriar.20221113.152412.029427/step/000/reducer/00000$ cat input
1 [13, 78.5753201837955]
1 [13, 77.20894832945609]
1 [12, 75.70546637672973]
1 [12, 73.97942285230064]
1 [13, 78.7642193551817]
1 [13, 74.83203774429285]
1 [13, 72.28868623927899]
1 [11, 67.51370208632588]
I commented the for
loop inside the reducer method to check whether the error is because of packedValues
but I was getting the error again.
Any idea is appreciated.
Upvotes: 0
Views: 107
Reputation: 1
I've resolved a similar issue by yield (key, value) pair from the reducer function.
def reduce(self, key, packed_values):
...
# must yield k, v here
yield 1, cumulative_n / cumulative_ln_val
Looking into the source code of job.py, it looks like the "line 889, in _combine_or_reduce_pairs" expects key-value results from the reducer function.
def _combine_or_reduce_pairs(self, pairs, mrc, step_num=0):
"""Helper for :py:meth:`combine_pairs` and :py:meth:`reduce_pairs`."""
step = self._get_step(step_num, MRStep)
task = step[mrc]
task_init = step[mrc + '_init']
task_final = step[mrc + '_final']
if task is None:
raise ValueError('No %s in step %d' % (mrc, step_num))
if task_init:
for k, v in task_init() or ():
yield k, v
# group all values of the same key together, and pass to the reducer
#
# be careful to use generators for everything, to allow for
# very large groupings of values
for key, pairs_for_key in itertools.groupby(pairs, lambda k_v: k_v[0]):
values = (value for _, value in pairs_for_key)
for k, v in task(key, values) or ():
yield k, v
Upvotes: 0