Ajar
Ajar

Reputation: 1826

PySpark: filtering out RDD elements fails on 'NoneType' object is not iterable

I want to filter out elements of an RDD where the field 'status' is not equal to 'OK'. I create my RDD from a set of CSV files on HDFS, then use map to get the structure I want before trying to filter:

import csv, StringIO    

files = "/hdfs_path/*.csv"

fields = ["time", "status"]

dial = "excel"

default = {'status': 'OK', 'time': '2014-01-01  00:00:00'}

def loadRecord(line, fieldnames, dialect):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames = fieldnames, dialect = dialect)
    try:
        line = reader.next()
        if line is None:
            return default
        else:
            return line
    except:
        return default

harmonics = sc.textFile(files) \
              .map(lambda x: loadRecord(x, fields, dial)) \
              .filter(lambda x: "OK" not in x['status'])

I can do other things to this RDD -- e.g. another map to get only certain fields, etc. However, when I run my code with the filter, one of the tasks always fails with an exception in my filter lambda function:

'NoneType object is not iterable'

I thought this meant that the filter lambda was receiving a None, so I added code to loadRecord to avoid returning None. However, I still get the same error. It does work on a small sample dataset, but my actual data is large enough that I'm not sure how to detect which part(s) of it may be causing the problem.

Any input appreciated!

Upvotes: 2

Views: 11820

Answers (2)

Ajar
Ajar

Reputation: 1826

Using 0x0FFF's answer as a basis, I was able to get my code to run. I still haven't seen the offending line of the offending file, but I'm a lot closer than I was. Here's what I did, starting with the code in my question:

def checkNone(x):
    try:
        return "OK" not in x['status']
    except:
        return True

harmonics = sc.textFile(files) \
              .map(lambda x: loadRecord(x, fields, dial)) \
              .filter(lambda x: checkNone(x))

Upvotes: 0

0x0FFF
0x0FFF

Reputation: 5018

First, relpace map(lambda x: loadRecord(x, fields, dial)) with map(lambda x: (x, loadRecord(x, fields, dial))) - this way you save both original record and the parsed one.

Second, replace filter() call with flatMap(test_function) and define the test_function the way it tests the input and if the second passed parameter is None (parsed record) it whould return the first one.

This way you would get the input lines causing your problem and would test your script on them locally. And in general I would add a line global default as a first line of your loadRecord function

Upvotes: 3

Related Questions