anurag
anurag

Reputation: 636

Add months to date column in Spark dataframe

I have a scenario where I want to add months to a date column in spark DataFrame which has two columns with data type (Date, Int)

e.g.

df.show()
data_date months_to_add
2015-06-23 5
2016-07-20 7

I want to add a new column which will have a new date (After adding months to existing date) and output will look like below-

data_date month_to_add new_data_date
2015-06-23 5           2015-11-23
2016-07-20 1           2016-8-20

I have tried below piece of code, but it does not seems to be working-

df = df.withColumn("new_data_date", a
  dd_months(col("data_date"), col("months_to_add")))

it gives me error-

'Column' object is not callable

Please help me if there is any method to achieve this without using SQL query on top of dataframe.

Upvotes: 12

Views: 42262

Answers (3)

Alper t. Turker
Alper t. Turker

Reputation: 35229

I'd use expr:

from pyspark.sql.functions import expr

df = spark.createDataFrame(
    [("2015-06-23", 5), ("2016-07-20", 7)],
    ("data_date", "months_to_add")
).select(to_date("data_date").alias("data_date"), "months_to_add")

df.withColumn("new_data_date", expr("add_months(data_date, months_to_add)")).show()

+----------+-------------+-------------+
| data_date|months_to_add|new_data_date|
+----------+-------------+-------------+
|2015-06-23|            5|   2015-11-23|
|2016-07-20|            7|   2017-02-20|
+----------+-------------+-------------+

Upvotes: 15

Neeraj Bhadani
Neeraj Bhadani

Reputation: 3100

Try Below code. It is working for me.

from pyspark.sql import Row
l =  [("2015-06-23", 5),("2016-07-20", 7)]
rdd1 = sc.parallelize(l)
row_rdd = rdd1.map(lambda x: Row(x[0], x[1]))
df = sqlContext.createDataFrame(row_rdd,['data_date', 'months_to_add'])
df.withColumn("new_data_date",add_months(col("data_date"), df.first()[1])).show()

Regards, Neeraj

Upvotes: -2

Neeraj Bhadani
Neeraj Bhadani

Reputation: 3100

since function add_months expect second argument as integer and you are passing column value you are getting an error.

Try using below statement and check

df.withColumn("new_data_date",add_months(col("data_date"), df.first()[1])).show()

Hope it helps.

Regards,

Neeraj

Upvotes: -1

Related Questions