Satya
Satya

Reputation: 5907

optimization for processing big data in pyspark

Not a question->need a suggestion

I am operating on 20gb+6gb=26Gb csv file with 1+3 (1-master, 3-slave (each of 16 gb RAM).

This is how I am doing my ops

df = spark.read.csv() #20gb
df1 = spark.read.csv() #6gb
df_merged= df.join(df1,'name','left') ###merging 
df_merged.persists(StorageLevel.MEMORY_AND_DISK) ##if i do MEMORY_ONLY will I gain more performance?????
print('No. of records found: ',df_merged.count())  ##just ensure persist by calling an action
df_merged.registerTempTable('table_satya')
query_list= [query1,query2,query3]  ###sql query string to be fired
city_list = [city1, city2,city3...total 8 cities]
file_index=0 ###will create files based on increasing index
for query_str in query_list:
   result = spark.sql(query_str) #ex: select * from table_satya where date >= '2016-01-01'
   #result.persist()  ###willit increase performance
   for city in city_list:
        df_city = result.where(result.city_name==city)
        #store as csv file(pandas style single file)
        df_city.collect().toPandas().to_csv('file_'+str(file_index)+'.csv',index=False)
        file_index += 1

df_merged.unpersist()  ###do I even need to do it or Spark can handle it internally

Currently it is taking a huge time.

#persist(On count())-34 mins.
#each result(on firing each sql query)-around (2*8=16min toPandas() Op)
#          #for each toPandas().to_csv() - around 2 min each
#for 3 query 16*3= 48min
#total 34+48 = 82 min  ###Need optimization seriously

So can anybody suggest how can i optimize the above process for a better performance(Time and Memory both.)

Why I am worried is : I was doing the above on Python-Pandas platform (64Gb single machine with serialized pickle data) and I was able to do that in 8- 12mins. As my data-volume seems growing, so need to adopt a technology like spark.

Thanks in Advance. :)

Upvotes: 1

Views: 1849

Answers (1)

Steven Rumbalski
Steven Rumbalski

Reputation: 45562

I think your best bet is cutting the source data down to size. You mention that your source data has 90 cities, but you are only interested in 8 of them. Filter out the cities you don't want and keep the ones you do want in separate csv files:

import itertools
import csv

city_list = [city1, city2,city3...total 8 cities]

with open('f1.csv', 'rb') as f1, open('f2.csv', 'rb') as f2:
    r1, r2 = csv.reader(f1), csv.reader(f2)
    header = next(r1)
    next(r2) # discard headers in second file
    city_col = header.index('city_name')
    city_files = []
    city_writers = {}
    try:
    for city in city_list:
            f = open(city+'.csv', 'wb')
            city_files.append(f)
            writer = csv.writer(f)
            writer.writerow(header)
            city_writers[city] = writer
        for row in itertools.chain(r1, r2):
            city_name = row[city_col]
            if city_name in city_writers:
                city_writers[city_name].writerow(row)
    finally:
        for f in city_files:
            f.close()

After this iterate over each city, creating a DataFrame for the city, then in a nested loop run your three queries. Each DataFrame should have no problem fitting in memory and the queries should run quickly since they are running over a much smaller data set.

Upvotes: 1

Related Questions