Reputation: 1826
I want to filter
out elements of an RDD where the field 'status' is not equal to 'OK'. I create my RDD from a set of CSV files on HDFS, then use map
to get the structure I want before trying to filter
:
import csv, StringIO
files = "/hdfs_path/*.csv"
fields = ["time", "status"]
dial = "excel"
default = {'status': 'OK', 'time': '2014-01-01 00:00:00'}
def loadRecord(line, fieldnames, dialect):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames = fieldnames, dialect = dialect)
try:
line = reader.next()
if line is None:
return default
else:
return line
except:
return default
harmonics = sc.textFile(files) \
.map(lambda x: loadRecord(x, fields, dial)) \
.filter(lambda x: "OK" not in x['status'])
I can do other things to this RDD -- e.g. another map
to get
only certain fields, etc. However, when I run my code with the filter
, one of the tasks always fails with an exception in my filter
lambda function:
'NoneType object is not iterable'
I thought this meant that the filter
lambda was receiving a None
, so I added code to loadRecord
to avoid returning None
. However, I still get the same error. It does work on a small sample dataset, but my actual data is large enough that I'm not sure how to detect which part(s) of it may be causing the problem.
Any input appreciated!
Upvotes: 2
Views: 11820
Reputation: 1826
Using 0x0FFF's answer as a basis, I was able to get my code to run. I still haven't seen the offending line of the offending file, but I'm a lot closer than I was. Here's what I did, starting with the code in my question:
def checkNone(x):
try:
return "OK" not in x['status']
except:
return True
harmonics = sc.textFile(files) \
.map(lambda x: loadRecord(x, fields, dial)) \
.filter(lambda x: checkNone(x))
Upvotes: 0
Reputation: 5018
First, relpace map(lambda x: loadRecord(x, fields, dial))
with map(lambda x: (x, loadRecord(x, fields, dial)))
- this way you save both original record and the parsed one.
Second, replace filter()
call with flatMap(test_function)
and define the test_function
the way it tests the input and if the second passed parameter is None (parsed record) it whould return the first one.
This way you would get the input lines causing your problem and would test your script on them locally. And in general I would add a line global default
as a first line of your loadRecord
function
Upvotes: 3