9ganzi
9ganzi

Reputation: 311

Adding a column to a dataframe using another columns

I am new to spark and I have a dataframe as the following,

+-------------------+----------+--------------------+----------------+--------------+
|           placekey|naics_code|       visits_by_day|date_range_start|date_range_end|
+-------------------+----------+--------------------+----------------+--------------+
|22b-223@627-wdh-fcq|    311811|[22,16,22,32,44,1...|      2018-12-31|    2019-01-07|
|22b-222@627-wc3-99f|    311811|     [2,4,3,3,4,6,5]|      2019-01-28|    2019-02-04|
|222-222@627-w9g-rrk|    311811|     [3,3,5,5,6,2,5]|      2019-02-04|    2019-02-11|
+-------------------+----------+--------------------+----------------+--------------+

I want to create another column date_bet_dates that has a list of dates between date_range_start and date_range_end. This is the code I have so far,

def get_dates(s, e):
    start = datetime.strptime(s, '%Y-%m-%d').date()
    end = datetime.strptime(e, '%Y-%m-%d').date()
    return pd.date_range(start, end - timedelta(days=1),freq='d')

udf_get_dates  = udf(lambda x: get_dates(x), DateType())

df = df.withColumn('date_bet_dates', udf_get_dates(df['date_range_start'], df['date_range_end']))

df.show(3)

And an error occurs at the line df.show(3),

TypeError: <lambda>() takes 1 positional argument but 2 were given

I have no idea what arguments it is talking about, but I assume this is something to do with my get_dates function. What needs to be changed to solve this problem?

Upvotes: 0

Views: 188

Answers (3)

Lamanus
Lamanus

Reputation: 13581

If I do the same thing, I will use the sequence function.

df.withColumn('date_bet_dates', f.expr('sequence(to_date(date_range_start), to_date(date_range_end), interval 1 days)')).show(truncate=False)

+----------------+--------------+------------------------------------------------------------------------------------------------+
|date_range_start|date_range_end|date_bet_dates                                                                                  |
+----------------+--------------+------------------------------------------------------------------------------------------------+
|2018-12-31      |2019-01-07    |[2018-12-31, 2019-01-01, 2019-01-02, 2019-01-03, 2019-01-04, 2019-01-05, 2019-01-06, 2019-01-07]|
|2019-01-28      |2019-02-04    |[2019-01-28, 2019-01-29, 2019-01-30, 2019-01-31, 2019-02-01, 2019-02-02, 2019-02-03, 2019-02-04]|
|2019-02-04      |2019-02-11    |[2019-02-04, 2019-02-05, 2019-02-06, 2019-02-07, 2019-02-08, 2019-02-09, 2019-02-10, 2019-02-11]|
+----------------+--------------+------------------------------------------------------------------------------------------------+

Upvotes: 0

pltc
pltc

Reputation: 6082

There are several wrong pieces in your UDF:

  1. Your lambda took only one parameter lambda x: get_dates(x), while the designated function took two arguments def get_dates(s, e)
  2. You expected the UDF to return a list of date return pd.date_range, but the UDF's return type is just DateType, not an ArrayType.

This is the fix

from pyspark.sql import functions as F
from pyspark.sql import types as T

def get_dates(s, e):
    start = datetime.strptime(s, '%Y-%m-%d').date()
    end = datetime.strptime(e, '%Y-%m-%d').date()
    return pd.date_range(start, end - timedelta(days=1),freq='d').date.tolist() # you return a list here

udf_get_dates  = F.udf(lambda x, y: get_dates(x, y), T.ArrayType(T.DateType())) # then call lambda with 2 arguments here

df = df.withColumn('date_bet_dates', udf_get_dates(df['date_range_start'], df['date_range_end'])) # finally, trigger the UDF here

Upvotes: 2

rchome
rchome

Reputation: 2723

get_dates() takes the two arguments s and e. You've wrapped it in a lambda which only takes one argument x. Get rid of the lambda and you should be good.

udf_get_dates  = udf(get_dates, DateType())

Upvotes: 1

Related Questions