Reputation: 5907
Not a question->need a suggestion
I am operating on 20gb+6gb=26Gb csv file with 1+3 (1-master, 3-slave (each of 16 gb RAM).
This is how I am doing my ops
df = spark.read.csv() #20gb
df1 = spark.read.csv() #6gb
df_merged= df.join(df1,'name','left') ###merging
df_merged.persists(StorageLevel.MEMORY_AND_DISK) ##if i do MEMORY_ONLY will I gain more performance?????
print('No. of records found: ',df_merged.count()) ##just ensure persist by calling an action
df_merged.registerTempTable('table_satya')
query_list= [query1,query2,query3] ###sql query string to be fired
city_list = [city1, city2,city3...total 8 cities]
file_index=0 ###will create files based on increasing index
for query_str in query_list:
result = spark.sql(query_str) #ex: select * from table_satya where date >= '2016-01-01'
#result.persist() ###willit increase performance
for city in city_list:
df_city = result.where(result.city_name==city)
#store as csv file(pandas style single file)
df_city.collect().toPandas().to_csv('file_'+str(file_index)+'.csv',index=False)
file_index += 1
df_merged.unpersist() ###do I even need to do it or Spark can handle it internally
Currently it is taking a huge time.
#persist(On count())-34 mins.
#each result(on firing each sql query)-around (2*8=16min toPandas() Op)
# #for each toPandas().to_csv() - around 2 min each
#for 3 query 16*3= 48min
#total 34+48 = 82 min ###Need optimization seriously
So can anybody suggest how can i optimize the above process for a better performance(Time and Memory both.)
Why I am worried is : I was doing the above on Python-Pandas platform (64Gb single machine with serialized pickle data) and I was able to do that in 8- 12mins. As my data-volume seems growing, so need to adopt a technology like spark.
Thanks in Advance. :)
Upvotes: 1
Views: 1849
Reputation: 45562
I think your best bet is cutting the source data down to size. You mention that your source data has 90 cities, but you are only interested in 8 of them. Filter out the cities you don't want and keep the ones you do want in separate csv files:
import itertools
import csv
city_list = [city1, city2,city3...total 8 cities]
with open('f1.csv', 'rb') as f1, open('f2.csv', 'rb') as f2:
r1, r2 = csv.reader(f1), csv.reader(f2)
header = next(r1)
next(r2) # discard headers in second file
city_col = header.index('city_name')
city_files = []
city_writers = {}
try:
for city in city_list:
f = open(city+'.csv', 'wb')
city_files.append(f)
writer = csv.writer(f)
writer.writerow(header)
city_writers[city] = writer
for row in itertools.chain(r1, r2):
city_name = row[city_col]
if city_name in city_writers:
city_writers[city_name].writerow(row)
finally:
for f in city_files:
f.close()
After this iterate over each city, creating a DataFrame for the city, then in a nested loop run your three queries. Each DataFrame should have no problem fitting in memory and the queries should run quickly since they are running over a much smaller data set.
Upvotes: 1