Reputation: 434
I have a method that converts the a string to a specific type. The data is read from a csv file and used to create an rdd. To get this function to work I had to put the import statements inside the function definition. This means the lines are executed on each call to the function. The imported packages must exist on the cluster nodes or the function would not work. Is it possible to move the imports out of the method and still have them referenced? If so, how?
def convertType(v, ftype, fmt = '%Y-%m-%d %H:%M:%S %Z', nodate = '1900-01-01 0:00:00: GMT', empty2None = False):
import datetime
import decimal
v = v.strip() # clean up the string
if empty2None:
if v == "": # do we have an empty string?
return None
ftypes = { 'null': 'None', \
'date': 'datetime.date(int(v))', \
'timestamp': 'datetime.datetime.strptime(v if not (v == "") else nodate, fmt)', \
'binary': 'bytearray(v)', \
'integer': 'int(v)', \
'boolean': 'bool(v)', \
'long': 'long(v)', \
'double': 'float(v)', \
'float': 'float(v)', \
'short': 'int(v)', \
'byte': 'int(v)', \
'string': 'str(v)', \
'decimal': 'decimal.Decimal(v)' \
}
return eval(ftypes[ftype.lower()])
data = raw.map(lambda p: [convertType(p[0][i], typeparts[i], fmt, nodate, True) for i in indx]) # convert split text to data rows
Upvotes: 0
Views: 516
Reputation: 470
To minimize import overhead you could try using mapPartitions. Then, you would import once per partition (not for each row).
def convertPartitionType(elements):
import ...
return [convertType(x) for x in elements]
Where, of course, convertType does not import anything.
Upvotes: 1