Reputation: 2189
Im trying to run a simple map reduce job and got the following datasets:
bike.txt
1 Bike 1
2 Bike 2
3 Bike 4
4 Bike 4
5 Bike 4
bikenames.txt
1,Aap
2,Noot
3,Greet
4,Mies
5,Gazelle
My aim is to write a mapreduce job that out the name of the bike that occurs most. Therefore I wrote the following:
from mrjob.job import MRJob
from mrjob.step import MRStep
class MostPopularBike(MRJob):
def configure_options(self):
super(MostPopularBike, self).configure_options()
self.add_file_option('--items', help='Path to u.item')
def steps(self):
return [
MRStep(mapper=self.mapper_get_ratings,
reducer=self.reducer_count_ratings),
MRStep(mapper = self.mapper_passthrough,
reducer = self.reducer_find_max)
]
def mapper_get_ratings(self, _, line):
(bikeID, name) = line.split('\t')
yield bikeID, 1
def reducer_init(self):
self.bikeNames = {}
with open("bikenames.txt`") as f:
for line in f:
fields = line.split(',')
self.bikeNames[fields[0]] = fields[1]
def reducer_count_ratings(self, key, values):
yield None, (sum(values), self.bikeNames[key])
def mapper_passthrough(self, key, value):
yield key, value
def reducer_find_max(self, key, values):
yield max(values)
if __name__ == '__main__':
MostPopularBike.run()
If I try to run it using:
!python MostPopularBike.py --items=bikenames.txt bike.txt
It trows the following error however:
AttributeError: 'MostPopularBike' object has no attribute 'bikeNames'
Any thoughts on what goes wrong here?
Upvotes: 0
Views: 458
Reputation: 874
bikeNames
is only defined in reducer_init()
, so this function must not be getting called. Anyway, it's not really an initialization function for each step; it looks more like initialization for the job.
Change the function name from reducer_init
to __init__
to perform the initialization when you create the MostPopularBike
instance. Or if you really do want the initialization performed on each step, update your steps
to this:
def steps(self):
return [
MRStep(reducer_init=self.reducer_init,
mapper=self.mapper_get_ratings,
reducer=self.reducer_count_ratings),
MRStep(reducer_init=self.reducer_init,
mapper = self.mapper_passthrough,
reducer = self.reducer_find_max)
]
Upvotes: 1