Reputation: 159
I am currently reading all of the text files in a public AWS bucket that contains hundreds of CSV files. I read all of the CSV files at once and then turn them into an RDD and start massaging the data so that it can be stored in Cassandra. Processing all of the textfiles is taking over two hours and a half and this is too long for just 100GB of data. Is there anything I can do to my code below to make it faster?
I appreciate any suggestions. I've also tried reading this https://robertovitillo.com/2015/06/30/spark-best-practices/ but I'm confused with how to implement some of the things mentioned like "Using the right level of parallelism." I also tried storing my RDD in cache by doing rdd.cache, but that still took over two hours.
conf = SparkConf() \
.setMaster("spark://%s:%s" % (SPARK_IP, SPARK_PORT))
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
rdd = sc.textFile("s3a://gdelt-open-data/events/*")
def rddCleaning(rd,timeframe):
def check_valid(tup):
try:
int(tup[1])
int(tup[4])
float(tup[5])
float(tup[6])
return True
except ValueError:
return False
def fillin(tup):
if tup[2] == "" and tup[3] != "":
return ((tup[0],tup[1],tup[3],tup[4],tup[5],tup[6]))
else:
return ((tup[0],tup[1],tup[2],tup[4],tup[5],tup[6]))
def popular_avg(curr_tup):
lst = curr_tup[1]
dictionary = curr_tup[2]
dict_matches = {}
for tup in lst:
event_type = tup[0]
dict_matches[event_type] = dictionary[event_type]
return ((curr_tup[0],lst,dict_matches,curr_tup[3]))
def merge_info(tup):
main_dict = tup[1]
info_dict = tup[2]
for key in info_dict:
main_dict[key].update(info_dict[key])
main_dict["TotalArticles"] = {"total":tup[3]}
return ((tup[0],main_dict))
def event_todict(tup):
lst = tup[1]
dict_matches = {}
for event_tup in lst:
dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
return ((tup[0],dict_matches,tup[2],tup[3]))
def sum_allevents(tup):
type_lst = tup[1]
total_mentions = 0
for event in type_lst:
total_mentions += event[1]
return ((tup[0],type_lst,tup[2],total_mentions))
actionGeo_CountryCode = 51
time = 0
actor1Type1Code = 12
actor2Type1Code = 22
numArticles = 33
goldsteinScale = 30
avgTone = 34
if timeframe == "SQLDATE":
time = 1
elif timeframe == "MonthYear":
time = 2
else:
time = 3
rdd_reduce = rd.map(lambda x: x.split('\t')) \
.map(lambda y: ((y[actionGeo_CountryCode],
y[time],
y[actor1Type1Code],
y[actor2Type1Code],
y[numArticles],
y[goldsteinScale],
y[avgTone]))) \
.filter(check_valid) \
.map(lambda c: ((c[0],int(c[1]),c[2],c[3],int(c[4]),int(float(c[5])),int(float(c[6]))))) \
.map(fillin) \
.filter(lambda r: r[0] in tofullname and r[2] in toevent and r[2] != "" and r[0] != "") \
.map(lambda t: ((tofullname[t[0]],t[1],toevent[t[2]],t[3],t[4],t[5]))) \
.map(lambda f: (((f[0],f[1],f[2]),(f[3],f[4],f[5],1)))) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1], a[2]+b[2], a[3]+b[3])) \
.map(lambda s: ((s[0],(s[1][0],s[1][1]/s[1][3],s[1][2]/s[1][3]))))
rdd_format = rdd_reduce.map(lambda t:((t[0][0],t[0][1]),
([(t[0][2],t[1][0])],
[(t[0][2],{"GoldsteinScaleAvg":t[1][1],
"ToneAvg":t[1][2]})]))) \
.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
.map(lambda v: (v[0],
sorted(v[1][0],key=itemgetter(1),reverse=True),
v[1][1])) \
.map(sum_allevents) \
.map(lambda f: ((f[0],f[1][:5],dict(f[2]),f[3]))) \
.map(popular_avg) \
.map(event_todict) \
.map(merge_info) \
.map(lambda d: ((d[0][0],d[0][1],d[1])))
return rdd_format
daily_rdd = rddCleaning(rdd,"SQLDATE")
print(daily_rdd.take(6));
monthly_rdd = rddCleaning(rdd,"MonthYear")
print(monthly_rdd.take(6));
yearly_rdd = rddCleaning(rdd,"Year")
print(yearly_rdd.take(6));
Here is a picture of my pyspark running:
Edits made after suggestions: I made the following changes to my code and it improved the performance, but it is still taking a long time. Is this happening because every time I call df it is reading all of the files from my S3 bucket all over again? Should I put some of my df and temporary tables in cache? Here is my code:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, DoubleType, IntegerType
from abbreviations_dict import tofullname, toevent
from operator import itemgetter
import pyspark_cassandra
sc = SparkContext()
sqlContext = SQLContext(sc)
customSchema = schema = StructType([
StructField('GLOBALEVENTID',StringType(),True),
StructField('SQLDATE',StringType(),True),
StructField('MonthYear',StringType(),True),
StructField('Year',StringType(),True),
StructField('FractionDate',StringType(),True),
StructField('Actor1Code',StringType(),True),
StructField('Actor1Name',StringType(),True),
StructField('Actor1CountryCode',StringType(),True),
StructField('Actor1KnownGroupCode',StringType(),True),
StructField('Actor1EthnicCode',StringType(),True),
StructField('Actor1Religion1Code',StringType(),True),
StructField('Actor1Religion2Code',StringType(),True),
StructField('Actor1Type1Code',StringType(),True),
StructField('Actor1Type2Code',StringType(),True),
StructField('Actor1Type3Code',StringType(),True),
StructField('Actor2Code',StringType(),True),
StructField('Actor2Name',StringType(),True),
StructField('Actor2CountryCode',StringType(),True),
StructField('Actor2KnownGroupCode',StringType(),True),
StructField('Actor2EthnicCode',StringType(),True),
StructField('Actor2Religion1Code',StringType(),True),
StructField('Actor2Religion2Code',StringType(),True),
StructField('Actor2Type1Code',StringType(),True),
StructField('Actor2Type2Code',StringType(),True),
StructField('Actor2Type3Code',StringType(),True),
StructField('IsRootEvent',StringType(),True),
StructField('EventCode',StringType(),True),
StructField('EventBaseCode',StringType(),True),
StructField('EventRootCode',StringType(),True),
StructField('QuadClass',StringType(),True),
StructField('GoldsteinScale',StringType(),True),
StructField('NumMentions',StringType(),True),
StructField('NumSources',StringType(),True),
StructField('NumArticles',StringType(),True),
StructField('AvgTone',StringType(),True),
StructField('Actor1Geo_Type',StringType(),True),
StructField('Actor1Geo_FullName',StringType(),True),
StructField('Actor1Geo_CountryCode',StringType(),True),
StructField('Actor1Geo_ADM1Code',StringType(),True),
StructField('Actor1Geo_Lat',StringType(),True),
StructField('Actor1Geo_Long',StringType(),True),
StructField('Actor1Geo_FeatureID',StringType(),True),
StructField('Actor2Geo_Type',StringType(),True),
StructField('Actor2Geo_FullName',StringType(),True),
StructField('Actor2Geo_CountryCode',StringType(),True),
StructField('Actor2Geo_ADM1Code',StringType(),True),
StructField('Actor2Geo_Lat',StringType(),True),
StructField('Actor2Geo_Long',StringType(),True),
StructField('Actor2Geo_FeatureID',StringType(),True),
StructField('ActionGeo_Type',StringType(),True),
StructField('ActionGeo_FullName',StringType(),True),
StructField('ActionGeo_CountryCode',StringType(),True),
StructField('ActionGeo_ADM1Code',StringType(),True),
StructField('ActionGeo_Lat',StringType(),True),
StructField('ActionGeo_Long',StringType(),True),
StructField('ActionGeo_FeatureID',StringType(),True),
StructField('DATEADDED',StringType(),True),
StructField('SOURCEURL',StringType(),True)])
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='false') \
.options(delimiter="\t") \
.load('s3a://gdelt-open-data/events/*', schema = customSchema)
def modify_values(r,y):
if r == '' and y != '':
return y
else:
return r
def country_exists(r):
if r in tofullname:
return tofullname[r]
else:
return ''
def event_exists(r):
if r in toevent:
return toevent[r]
else:
return ''
modify_val = udf(modify_values, StringType())
c_exists = udf(country_exists,StringType())
e_exists = udf(event_exists,StringType())
dfsub1 = df.withColumn("Actor1Type1Code",modify_val(col("Actor1Type1Code"),col("Actor2Type1Code"))) \
.withColumn("ActionGeo_CountryCode",c_exists(col("ActionGeo_CountryCode"))) \
.withColumn("Actor1Type1Code",e_exists(col("Actor1Type1Code")))
sqlContext.registerDataFrameAsTable(dfsub1, 'temp')
df2 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
SQLDATE, MonthYear, Year,
Actor1Type1Code,
NumArticles,
GoldsteinScale,
AvgTone
FROM temp
WHERE ActionGeo_CountryCode <> ''
AND Actor1Type1Code <> ''
AND NumArticles <> ''
AND GoldsteinScale <> ''
AND AvgTone <> ''""")
sqlContext.registerDataFrameAsTable(df2, 'temp2')
df3 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
CAST(SQLDATE AS INTEGER), CAST(MonthYear AS INTEGER), CAST(Year AS INTEGER),
Actor1Type1Code,
CAST(NumArticles AS INTEGER),
CAST(GoldsteinScale AS INTEGER),
CAST(AvgTone AS INTEGER)
FROM temp2""")
sqlContext.registerDataFrameAsTable(df3, 'temp3')
sqlContext.cacheTable('temp3')
dfdaily = sqlContext.sql("""SELECT ActionGeo_CountryCode,
SQLDATE,
Actor1Type1Code,
SUM(NumArticles) AS NumArticles,
ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
ROUND(AVG(AvgTone),0) AS AvgTone
FROM temp3
GROUP BY ActionGeo_CountryCode,
SQLDATE,
Actor1Type1Code""")
dfmonthly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
MonthYear,
Actor1Type1Code,
SUM(NumArticles) AS NumArticles,
ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
ROUND(AVG(AvgTone),0) as AvgTone
FROM temp3
GROUP BY ActionGeo_CountryCode,
MonthYear,
Actor1Type1Code""")
dfyearly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
Year,
Actor1Type1Code,
SUM(NumArticles) AS NumArticles,
ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
ROUND(AVG(AvgTone),0) as AvgTone
FROM temp3
GROUP BY ActionGeo_CountryCode,
Year,
Actor1Type1Code""")
def rddCleaning(rd,timeframe):
def popular_avg(curr_tup):
lst = curr_tup[1]
dictionary = curr_tup[2]
dict_matches = {}
for tup in lst:
event_type = tup[0]
dict_matches[event_type] = dictionary[event_type]
return ((curr_tup[0],lst,dict_matches,curr_tup[3]))
def merge_info(tup):
main_dict = tup[1]
info_dict = tup[2]
for key in info_dict:
main_dict[key].update(info_dict[key])
main_dict["TotalArticles"] = {"total":tup[3]}
return ((tup[0],main_dict))
def event_todict(tup):
lst = tup[1]
dict_matches = {}
for event_tup in lst:
dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
return ((tup[0],dict_matches,tup[2],tup[3]))
def sum_allevents(tup):
type_lst = tup[1]
total_mentions = 0
for event in type_lst:
total_mentions += event[1]
return ((tup[0],type_lst,tup[2],total_mentions))
rdd_format = rd.map(lambda y: ((y["ActionGeo_CountryCode"],y[timeframe]),
([(y["Actor1Type1Code"],y["NumArticles"])],
[(y["Actor1Type1Code"],{"Goldstein":y["GoldsteinScale"],"ToneAvg":y["AvgTone"]})]
))) \
.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
.map(lambda v: (v[0],
sorted(v[1][0],key=itemgetter(1),reverse=True),
dict(v[1][1]))) \
.map(sum_allevents) \
.map(popular_avg) \
.map(event_todict) \
.map(merge_info) \
.map(lambda d: ((d[0][0],d[0][1],d[1])))
return rdd_format
print("THIS IS THE FIRST ONE ######################################################")
daily_rdd = rddCleaning(dfdaily.rdd,"SQLDATE")
print(daily_rdd.take(5))
print("THIS IS THE SECOND ONE ######################################################")
monthly_rdd = rddCleaning(dfmonthly.rdd,"MonthYear")
print(monthly_rdd.take(5))
print("THIS IS THE THIRD ONE ######################################################")
yearly_rdd = rddCleaning(dfyearly.rdd,"Year")
print(yearly_rdd.take(5))
Upvotes: 1
Views: 2268
Reputation: 12991
The most immediate thing I can think of is to use dataframes instead of RDD. Basically RDD in python are considerably slower than in scala because of conversions between python and JVM. Also dataframes enjoy many optimizations.
It is very difficult to follow all the code here to try to suggest a conversion, however, as a basis you can use spark.read.csv to read from the csv directly to dataframe (and set a schema so that a lot of the validation would occur automatically) and the many existing functions should make it easy to write.
Upvotes: 2