Reputation: 35
I have a DataFrame with the following schema :
root
|-- user_id: string (nullable = true)
|-- user_loans_arr: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- loan_date: string (nullable = true)
| | |-- loan_amount: string (nullable = true)
|-- new_loan: struct (nullable = true)
| |-- loan_date : string (nullable = true)
| |-- loan_amount : string (nullable = true)
I want to use a UDF, which takes user_loans_arr and new_loan as inputs and add the new_loan struct to the existing user_loans_arr. Then, from user_loans_arr delete all the elements whose loan_date is older than 12 months.
Thanks in advance.
Upvotes: 2
Views: 4283
Reputation: 6323
if spark >= 2.4
then you don't need UDF, check the example below-
val df = spark.sql(
"""
|select user_id, user_loans_arr, new_loan
|from values
| ('u1', array(named_struct('loan_date', '2019-01-01', 'loan_amount', 100)), named_struct('loan_date',
| '2020-01-01', 'loan_amount', 100)),
| ('u2', array(named_struct('loan_date', '2020-01-01', 'loan_amount', 200)), named_struct('loan_date',
| '2020-01-01', 'loan_amount', 100))
| T(user_id, user_loans_arr, new_loan)
""".stripMargin)
df.show(false)
df.printSchema()
/**
* +-------+-------------------+-----------------+
* |user_id|user_loans_arr |new_loan |
* +-------+-------------------+-----------------+
* |u1 |[[2019-01-01, 100]]|[2020-01-01, 100]|
* |u2 |[[2020-01-01, 200]]|[2020-01-01, 100]|
* +-------+-------------------+-----------------+
*
* root
* |-- user_id: string (nullable = false)
* |-- user_loans_arr: array (nullable = false)
* | |-- element: struct (containsNull = false)
* | | |-- loan_date: string (nullable = false)
* | | |-- loan_amount: integer (nullable = false)
* |-- new_loan: struct (nullable = false)
* | |-- loan_date: string (nullable = false)
* | |-- loan_amount: integer (nullable = false)
*/
user_loans_arr and new_loan as inputs and add the new_loan struct to the existing user_loans_arr. Then, from user_loans_arr delete all the elements whose loan_date is older than 12 months.
spark >= 2.4
df.withColumn("user_loans_arr",
expr(
"""
|FILTER(array_union(user_loans_arr, array(new_loan)),
| x -> months_between(current_date(), to_date(x.loan_date)) < 12)
""".stripMargin))
.show(false)
/**
* +-------+--------------------------------------+-----------------+
* |user_id|user_loans_arr |new_loan |
* +-------+--------------------------------------+-----------------+
* |u1 |[[2020-01-01, 100]] |[2020-01-01, 100]|
* |u2 |[[2020-01-01, 200], [2020-01-01, 100]]|[2020-01-01, 100]|
* +-------+--------------------------------------+-----------------+
*/
spark < 2.4
// spark < 2.4
val outputSchema = df.schema("user_loans_arr").dataType
import java.time._
val add_and_filter = udf((userLoansArr: mutable.WrappedArray[Row], loan: Row) => {
(userLoansArr :+ loan).filter(row => {
val loanDate = LocalDate.parse(row.getAs[String]("loan_date"))
val period = Period.between(loanDate, LocalDate.now())
period.getYears * 12 + period.getMonths < 12
})
}, outputSchema)
df.withColumn("user_loans_arr", add_and_filter($"user_loans_arr", $"new_loan"))
.show(false)
/**
* +-------+--------------------------------------+-----------------+
* |user_id|user_loans_arr |new_loan |
* +-------+--------------------------------------+-----------------+
* |u1 |[[2020-01-01, 100]] |[2020-01-01, 100]|
* |u2 |[[2020-01-01, 200], [2020-01-01, 100]]|[2020-01-01, 100]|
* +-------+--------------------------------------+-----------------+
*/
Upvotes: 4
Reputation: 1712
You need to pass you array and structure column to the udf as an array or struct. I prefer passing it as struct. There you can manipulate the elements and return an array type.
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import numpy as np
#Test data
tst = sqlContext.createDataFrame([(1,2,3,4),(3,4,5,1),(5,6,7,8),(7,8,9,2)],schema=['col1','col2','col3','col4'])
tst_1=(tst.withColumn("arr",F.array('col1','col2'))).withColumn("str",F.struct('col3','col4'))
# udf to return array
@udf(ArrayType(StringType()))
def fn(row):
if(row.arr[1]>row.str.col4):
res=[]
else:
res.append(row.str[i])
res = row.arr+row.str.asDict().values()
return(res)
# calling udf with a struct of array and struct column
tst_fin = tst_1.withColumn("res",fn(F.struct('arr','str')))
The result is
tst_fin.show()
+----+----+----+----+------+------+------------+
|col1|col2|col3|col4| arr| str| res|
+----+----+----+----+------+------+------------+
| 1| 2| 3| 4|[1, 2]|[3, 4]|[1, 2, 4, 3]|
| 3| 4| 5| 1|[3, 4]|[5, 1]| []|
| 5| 6| 7| 8|[5, 6]|[7, 8]|[5, 6, 8, 7]|
| 7| 8| 9| 2|[7, 8]|[9, 2]| []|
+----+----+----+----+------+------+----------
This example takes everything as int. Since you have strings as date , inside you udf you have to use datetime functions of python for the comparison.
Upvotes: -1