dmbaker
dmbaker

Reputation: 434

RDD.map and a function with nested imports

I have a method that converts the a string to a specific type. The data is read from a csv file and used to create an rdd. To get this function to work I had to put the import statements inside the function definition. This means the lines are executed on each call to the function. The imported packages must exist on the cluster nodes or the function would not work. Is it possible to move the imports out of the method and still have them referenced? If so, how?

def convertType(v, ftype, fmt = '%Y-%m-%d %H:%M:%S %Z', nodate = '1900-01-01 0:00:00: GMT', empty2None = False):
    import datetime
    import decimal
    v = v.strip() # clean up the string
    if empty2None:
    if v == "": # do we have an empty string?
        return None
    ftypes = { 'null': 'None', \
        'date': 'datetime.date(int(v))', \
        'timestamp': 'datetime.datetime.strptime(v if not (v == "") else nodate, fmt)', \
        'binary': 'bytearray(v)', \
        'integer': 'int(v)', \
        'boolean': 'bool(v)', \
        'long': 'long(v)', \
        'double': 'float(v)', \
        'float': 'float(v)', \
        'short': 'int(v)', \
        'byte': 'int(v)', \
        'string': 'str(v)', \
        'decimal': 'decimal.Decimal(v)' \
         }
    return eval(ftypes[ftype.lower()])

data = raw.map(lambda p: [convertType(p[0][i], typeparts[i], fmt, nodate, True) for i in indx]) # convert split text to data rows

Upvotes: 0

Views: 516

Answers (1)

Wojciech Jurczyk
Wojciech Jurczyk

Reputation: 470

To minimize import overhead you could try using mapPartitions. Then, you would import once per partition (not for each row).

def convertPartitionType(elements):
  import ...
  return [convertType(x) for x in elements]

Where, of course, convertType does not import anything.

Upvotes: 1

Related Questions