Reputation: 913
I am writing a wordCount program reading data from MySQL database, my data looks like this:
rawText = sqlContext.read.format("jdbc").options(url=jdbcUrl, dbtable = "KeyWordFed").load()
rawText.take(5)
[Row(id=1, text='RT @GretaLWall: #BREAKING: President Trump picks Jerome Powell to be next Chair of the Federal Reserve', created=datetime.datetime(2017, 11, 1, 21, 56, 59), id_str='925844141896011776', retweet_count=0, polarity=0.0, subjectivity=0.0), Row(id=2,.....]
I only want to take the text part and do some cleaning to it, so I use:
def clean_text(x):
text = re.search(r"text='(.+)', created=", str(x)).group(1)
clean_str = text.translate(str.maketrans('','',punc))
return clean_str
The first row is to take out the text part, second row is to strip off punctuation.
one_RDD = rawText.flatMap(lambda x: clean_text(x).split()).map(lambda y: (y,1))
one_RDD.take(30)
I got results:
[('RT', 1), ('@GretaLWall', 1), ('#BREAKING', 1), ('President', 1), ('Trump', 1), ('picks', 1), ('Jerome', 1), ('Powell', 1), ('to', 1), ('be', 1), ('next', 1), ('Chair', 1), ('of', 1), ('the', 1), ('Federal', 1), ('Reserve', 1), ('#Trump', 1), ('nomina', 1), ('Jerome', 1), ('Powell', 1), ('presidente', 1), ('della', 1), ('Federal', 1), ('Reserve', 1), ('#Trump', 1), ('#nomina', 1), ('#Jerome', 1), ('#Powell', 1), ('#presidente', 1), ('httpstco1ZUIZfgOFj', 1)]
Everything works perfectly until this point.
But When I try to aggregate all the words:
one_RDD = one_RDD.reduceByKey(lambda a,b: a + b)
one_RDD.take(5)
I met some error, the error message was too long. But basically it says:
File "<ipython-input-113-d273e318b1c5>", line 1, in <lambda>
File "<ipython-input-85-c8d7f3db6341>", line 2, in clean_text
AttributeError: 'NoneType' object has no attribute 'group'
Additional information:
I met this error before when I try the .map(lambda y: (y,1)) step. I was using lambda x: (x,1) when I see the error, then I changed to y, it solved the problem but I don't see why.
Upvotes: 0
Views: 803
Reputation: 10086
One of the rows in your RDD
doesn't contain the regex expression you are searching for. You can check this using:
rawText.filter(lambda x: re.search(r"text='(.+)', created=", str(x))).take(5)
Note that the error is Python based and not Spark. The logic in clean_text
doesn't deal with exceptions:
import re
from string import punctuation as punc
def clean_text(x):
try :
text = re.search(r"text='(.+)', created=", str(x)).group(1)
clean_str = text.translate(str.maketrans('','',punc))
return clean_str
except:
return ""
rawText=sc.parallelize(["Row(id=1, text='RT @GretaLWall: #BREAKING: President Trump picks Jerome Powell to be next Chair of the Federal Reserve', created=datetime.datetime(2017, 11, 1, 21, 56, 59), id_str='925844141896011776', retweet_count=0, polarity=0.0, subjectivity=0.0)",
"Row(id=1, created=datetime.datetime(2017, 11, 1, 21, 56, 59), id_str='925844141896011776', retweet_count=0, polarity=0.0, subjectivity=0.0)"])
one_RDD = rawText.flatMap(lambda x: clean_text(x).split()).map(lambda y: (y,1))
one_RDD.take(30)
[('RT', 1),
('GretaLWall', 1),
('BREAKING', 1),
('President', 1),
('Trump', 1),
('picks', 1),
('Jerome', 1),
('Powell', 1),
('to', 1),
('be', 1),
('next', 1),
('Chair', 1),
('of', 1),
('the', 1),
('Federal', 1),
('Reserve', 1)]
I suggest filtering these lines as raising exceptions can cause slow computations
Upvotes: 2