Dylan
Dylan

Reputation: 913

Pyspark reduceByKey error associated with flatmap lambda function

I am writing a wordCount program reading data from MySQL database, my data looks like this:

rawText = sqlContext.read.format("jdbc").options(url=jdbcUrl, dbtable = "KeyWordFed").load()
rawText.take(5)

[Row(id=1, text='RT @GretaLWall: #BREAKING: President Trump picks Jerome Powell to be next Chair of the Federal Reserve', created=datetime.datetime(2017, 11, 1, 21, 56, 59), id_str='925844141896011776', retweet_count=0, polarity=0.0, subjectivity=0.0), Row(id=2,.....]

I only want to take the text part and do some cleaning to it, so I use:

def clean_text(x):
    text = re.search(r"text='(.+)', created=", str(x)).group(1)
    clean_str = text.translate(str.maketrans('','',punc))
    return clean_str

The first row is to take out the text part, second row is to strip off punctuation.

one_RDD = rawText.flatMap(lambda x: clean_text(x).split()).map(lambda y: (y,1))
one_RDD.take(30)

I got results:

[('RT', 1), ('@GretaLWall', 1), ('#BREAKING', 1), ('President', 1), ('Trump', 1), ('picks', 1), ('Jerome', 1), ('Powell', 1), ('to', 1), ('be', 1), ('next', 1), ('Chair', 1), ('of', 1), ('the', 1), ('Federal', 1), ('Reserve', 1), ('#Trump', 1), ('nomina', 1), ('Jerome', 1), ('Powell', 1), ('presidente', 1), ('della', 1), ('Federal', 1), ('Reserve', 1), ('#Trump', 1), ('#nomina', 1), ('#Jerome', 1), ('#Powell', 1), ('#presidente', 1), ('httpstco1ZUIZfgOFj', 1)]

Everything works perfectly until this point.

But When I try to aggregate all the words:

one_RDD = one_RDD.reduceByKey(lambda a,b: a + b)
one_RDD.take(5)

I met some error, the error message was too long. But basically it says:

File "<ipython-input-113-d273e318b1c5>", line 1, in <lambda>
  File "<ipython-input-85-c8d7f3db6341>", line 2, in clean_text
AttributeError: 'NoneType' object has no attribute 'group'

Additional information:

I met this error before when I try the .map(lambda y: (y,1)) step. I was using lambda x: (x,1) when I see the error, then I changed to y, it solved the problem but I don't see why.

Upvotes: 0

Views: 803

Answers (1)

MaFF
MaFF

Reputation: 10086

One of the rows in your RDD doesn't contain the regex expression you are searching for. You can check this using:

rawText.filter(lambda x: re.search(r"text='(.+)', created=", str(x))).take(5)

Note that the error is Python based and not Spark. The logic in clean_text doesn't deal with exceptions:

import re
from string import punctuation as punc
def clean_text(x):
    try :
        text = re.search(r"text='(.+)', created=", str(x)).group(1)
        clean_str = text.translate(str.maketrans('','',punc))
        return clean_str
    except:
        return ""    

rawText=sc.parallelize(["Row(id=1, text='RT @GretaLWall: #BREAKING: President Trump picks Jerome Powell to be next Chair of the Federal Reserve', created=datetime.datetime(2017, 11, 1, 21, 56, 59), id_str='925844141896011776', retweet_count=0, polarity=0.0, subjectivity=0.0)", 
                        "Row(id=1, created=datetime.datetime(2017, 11, 1, 21, 56, 59), id_str='925844141896011776', retweet_count=0, polarity=0.0, subjectivity=0.0)"])
one_RDD = rawText.flatMap(lambda x: clean_text(x).split()).map(lambda y: (y,1))
one_RDD.take(30)

    [('RT', 1),
     ('GretaLWall', 1),
     ('BREAKING', 1),
     ('President', 1),
     ('Trump', 1),
     ('picks', 1),
     ('Jerome', 1),
     ('Powell', 1),
     ('to', 1),
     ('be', 1),
     ('next', 1),
     ('Chair', 1),
     ('of', 1),
     ('the', 1),
     ('Federal', 1),
     ('Reserve', 1)]

I suggest filtering these lines as raising exceptions can cause slow computations

Upvotes: 2

Related Questions