leontx
leontx

Reputation: 1195

How to pass a google mapreduce parameter to done_callback

I'm having trouble setting a parameter when kicking off a mapreduce via start_map so I can access it in done_callback. Numerous things I've read imply that it's possible, but somehow I've not got the earth-moon-stars properly aligned. Ultimately, what I'm trying to accomplish is to delete the temporary blob I created for the mapreduce job.

Here's how I kick it off:

mrID = control.start_map(
    "Find friends",
    "findfriendshandler.findFriendHandler",
    "mapreduce.input_readers.BlobstoreLineInputReader",
    {"blob_keys": blobKey},
    shard_count=7,
    mapreduce_parameters={'done_callback': '/fnfrdone','blobKey': blobKey})

In done_callback, the context object isn't available:

class FindFriendsDoneHandler(webapp.RequestHandler):

  def post(self):

     ctx = context.get()
     if ctx is not None:
        params = ctx.mapreduce_spec.mapper.params
        try:
           blobKey = params['blobKey']
           logging.info(['BLOBKEY ' + blobKey])
        except KeyError:
           logging.info('blobKey key not found in params')
     else:
        logging.info('context.get did not work')         #THIS IS WHAT GETS OUTPUT

Thanks!

EDIT: It seems like there may be more than one MR library, so I wanted to include my various imports:

from mapreduce import control
from mapreduce import operation as op
from mapreduce import context
from mapreduce import model

Upvotes: 2

Views: 430

Answers (2)

leontx
leontx

Reputation: 1195

Below is the code I used in my done_callback handler to retrieve my blobKey user parameter:

class FindFriendsDoneHandler(webapp.RequestHandler):

  mrID = self.request.headers['Mapreduce-Id']           

  try:
     mapreduceState = MapreduceState.get_by_key_name(mrID)   
     mrSpec = mapreduceState.mapreduce_spec
     jsonSpec = mrSpec.to_json()
     jsonParams = jsonSpec['params']
     blobKey = jsonParams['blobKey']
     blobInfo = BlobInfo.get(blobKey)
     blobInfo.delete()
     logging.info('Temp blob deleted successfully for mapreduce:' + mrID)
  except:
     logging.warning('Unable to delete temp blob for mapreduce:' + mrID)

This uses the mapreduce ID passed into done callback via the header to retrieve the mapreduce state model object from the mapreduce state table. The model stores any user params sent via start_map in a mapreduce_spec property which is in json format.

Note that MR, itself, actually stores the blob_key elsewhere in mapreduce_spec.

Thanks again to @Nick for pointing me to the model.py source file.

I'd love to hear if there's a simpler way to get at MR user params...

Upvotes: 3

Nick Johnson
Nick Johnson

Reputation: 101149

Context is only available to mappers/reducers - it's largely concerned with things that don't make sense outside the context of one. As you can see from the source, however, the "Mapreduce-Id" header is set, from which you can get the ID of the mapreduce job.

You shouldn't have to do your own cleanup, though - mapreduce has a handler that does it for you.

Upvotes: 1

Related Questions