hellotherebj
hellotherebj

Reputation: 121

how to apply functions in pyspark?

I have a function that returns specific date, that looks like this:

def specific_date(date_input):
    specificdate= """select *
                    from vw
                    where date = {date_1}
              """.format(date_1 = date_input)
    day_result = sqlContext.sql(specificdate)
    return day_result

and i have a dataframe that looks like this:

df1_schema = StructType([StructField("Date", StringType(), True),\
                              StructField("col1", IntegerType(), True),\
                             StructField("id", StringType(), True),\
                       StructField("col2", IntegerType(), True),\
                       StructField("col3", IntegerType(), True),\
                       StructField("col4", IntegerType(), True),\
                        StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2,2),('2020-08-02',0,'M1',2,3,0,1),\
           ('2020-08-03',0,'M1',3,3,2,3),('2020-08-04',0,'M1',3,3,2,1),\
            ('2020-08-01',0,'M2',1,3,3,1),('2020-08-02',0,'M2',-1,3,1,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()

+----------+----+---+----+----+----+----+
|      Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01|   0| M1|   3|   3|   2|   2|
|2020-08-02|   0| M1|   2|   3|   0|   1|
|2020-08-03|   0| M1|   3|   3|   2|   3|
|2020-08-04|   0| M1|   3|   3|   2|   1|
|2020-08-01|   0| M2|   1|   3|   3|   1|
|2020-08-02|   0| M2|  -1|   3|   1|   2|
+----------+----+---+----+----+----+----+

df1.createOrReplaceTempView("vw")

Then if I call a function specific_date(F.date_add('2020-08-01' , 1)) this would give me the dataframe where dates are '2020-08-02'

+----------+----+---+----+----+----+----+
|      Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-02|   0| M1|   2|   3|   0|   1|
|2020-08-02|   0| M2|  -1|   3|   1|   2|
+----------+----+---+----+----+----+----+

I tried many methods to do this, but didn't seem to work, any help would be appreciated..

Upvotes: 0

Views: 113

Answers (2)

eemilk
eemilk

Reputation: 1628

If you really want to use a function to add days to given datetime and also use the SQL query:

def specific_date(date_input, days_to_add):
    start_date = datetime.datetime.strptime(date_input, "%Y-%m-%d")
    end_date = start_date + datetime.timedelta(days = days_to_add)
    specificdate= "SELECT * FROM vw WHERE Date = date_format('{date_1}', 'yyyy-MM-dd')".format(date_1 = end_date)
    day_result = sqlContext.sql(specificdate)
    return day_result

And just use it as, where you provide the date_input and days_to_add

specific_date('2020-08-01', 1)

which will give you dataframe

+----------+----+---+----+----+----+----+
|      Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-02|   0| M1|   2|   3|   0|   1|
|2020-08-02|   0| M2|  -1|   3|   1|   2|
+----------+----+---+----+----+----+----+

But far better would be to just use

day_result = df1.filter(df1.Date == '2020-08-02')

Upvotes: 1

Thom Marchesini
Thom Marchesini

Reputation: 61

If you do not need a function that uses a tempview you could easily do something like that by:

import datetime

d = datetime.datetime.strptime("2020-08-01", "%Y-%m-%d")
d += datetime.timedelta(days=+1)
df1.where(col('Date') == d).show()

+----------+----+---+----+----+----+----+
|      Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-02|   0| M1|   2|   3|   0|   1|
|2020-08-02|   0| M2|  -1|   3|   1|   2|
+----------+----+---+----+----+----+----+

One issue with the code you provided is that the spark function F.date_add returns a column object. This cannot be directly used in the where statement.

Upvotes: 0

Related Questions