user_tom
user_tom

Reputation: 3

Create dictionary of dataframe in pyspark

I am trying to create a dictionary for year and month. Its a kind of macro which i can call over required no. of year and month. I am facing challenge while adding dynamic column in pyspark df

df = spark.createDataFrame([(1, "foo1",'2016-1-31'),(1, "test",'2016-1-31'), (2, "bar1",'2012-1-3'),(4, "foo2",'2011-1-11')], ("k", "v","date"))
w = Window().partitionBy().orderBy(col('date').desc())          
df = df.withColumn("next_date",lag('date').over(w).cast(DateType()))
df = df.withColumn("next_name",lag('v').over(w))
df = df.withColumn("next_date",when(col("k") !=  lag(df.k).over(w),date_add(df.date,605)).otherwise(col('next_date')))
df = df.withColumn("next_name",when(col("k") != lag(df.k).over(w),"").otherwise(col('next_name')))

import copy
dict_of_YearMonth = {}

for yearmonth in [200901,200902,201605 .. etc]:

    key_name = 'Snapshot_'+str(yearmonth)
    dict_of_YearMonth[key_name].withColumn("test",yearmonth)
    dict_of_YearMonth[key_name].withColumn("test_date",to_date(''+yearmonth[:4]+'-'+yearmonth[4:2]+'-1'+''))
 # now i want to add a condition 
  if(dict_of_YearMonth[key_name].test_date >= dict_of_YearMonth[key_name].date) and (test_date <= next_date) then output snapshot_yearmonth  /// i.e dataframe which satisfy this condition i am able to do it in pandas but facing challenge in pyspark
dict_of_YearMonth[key_name]  
dict_of_YearMonth 

Then i want to concatenate all the dataframe into single pyspark dataframe, i could do this in pandas as shown below but i need to do in pyspark

  snapshots=pd.concat([dict_of_YearMonth['Snapshot_201104'],dict_of_YearMonth['Snapshot_201105']])

If is any other idea to generate dictionary of dynamic data frame with dynamic addition of columns and perform condition and generate year based data frame and merge them in single data frame. Any help would be appreciated.

Upvotes: 0

Views: 3633

Answers (1)

user07
user07

Reputation: 670

I have tried below code is working fine

// Function to append all the dataframe using union
def unionAll(*dfs):
return reduce(DataFrame.unionAll, dfs)

// convert dates
def is_date(x):
    try:
        x= str(x)+str('01')
        parse(x)
        return datetime.datetime.strptime(x, '%Y%m%d').strftime("%Y-%m-%d")
    except ValueError:
        pass # if incorrect format, keep trying other format

dict_of_YearMonth = {}
for yearmonth in [200901,200910]:
key_name = 'Snapshot_'+str(yearmonth)
dict_of_YearMonth[key_name]=df
func =  udf(lambda x:  yearmonth, StringType())
dict_of_YearMonth[key_name] = df.withColumn("test",func(col('v')))
default_date = udf (lambda x : is_date(x))
dict_of_YearMonth[key_name] = dict_of_YearMonth[key_name].withColumn("test_date",default_date(col('test')).cast(DateType()))
dict_of_YearMonth  

To add mutiple dataframes use below code:

final_df = unionAll(dict_of_YearMonth['Snapshot_200901'],  dict_of_YearMonth['Snapshot_200910'])

Upvotes: 1

Related Questions