OMID Davami
OMID Davami

Reputation: 69

PySpark Transform an algorithm to UDF and apply it on a DataFrame

I wrote an algorithm that does something and prints the output. The input for my algorithm is a list with some integers. Here is the sample input as a list.

`mylist = [5,6,14,15,16,17,18,19,20,28,40,41,42,43,44,55]`

and here is my algorithm

    ```     

tduration = 0
duration = 0
avg = 0
bottleneck = 0
y = 0
x = 0
while x<len(mylist)-4 and y<len(mylist)-1 :
  if mylist[x+4] == mylist[x]+4:
    y = x + 4
    print("MY LIST X = ",mylist[x])
    print("X = ", x)
    print ("Y = ", y)
    while True:
      if y==len(mylist)-1 or mylist[y+1] > mylist[y]+10:
        bottleneck = bottleneck + 1
        duration = mylist[y] - mylist[x] + 1
        tduration = tduration + duration
        avg = tduration/bottleneck
        x = y + 1
        print("MY LIST Y = " , mylist[y])
        print("Duration = " , duration)
        break
      else: 
        y = y + 1
  else: 
    x = x + 1
print("BottleneckCount = ", bottleneck,  "\nAverageDuration = ", avg)

 ```

Now I want to transform this "Algorithm" to a User Defined Function (UDF) in PySpark and then apply this UDF to a DataFrame with one column. There is one list in each row of this DataFrame. Sample DataFrame has 1 column and 2 rows. row1 is a list of [10,11,19,20,21,22,23,24,25,33,45] and row2 is a list of [55,56,57,58,59,60,80,81,82,83,84,85,92,115] so the UDF should be applied to each row of DataFrame separately and give the results for each row in another column. Thank you in advance for your time and help. I will upvote your answers

Upvotes: 2

Views: 493

Answers (2)

OMID Davami
OMID Davami

Reputation: 69

YOLO answered this question and it is a complete answer. The only problem is that in the last column for "avg", we are getting NULL values. I realized that I can solve this problem by using this "func" instead of that "func" in YOLO's answer.

import pyspark.sql.types as T
func = F.udf(lambda x: calculate(x), T.StructType(
        [T.StructField("val1", T.IntegerType(), True),
         T.StructField("val2", T.FloatType(), True)]))

Upvotes: 0

YOLO
YOLO

Reputation: 21739

Here's a way you can do:

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, ArrayType

def calculate(mylist):

    tduration = 0
    duration = 0
    avg = 0
    bottleneck = 0
    y = 0
    x = 0
    while x<len(mylist)-4 and y<len(mylist)-1 :

        if mylist[x+4] == mylist[x]+4:
            y = x + 4
            print("MY LIST X = ",mylist[x])
            print("X = ", x)
            print ("Y = ", y)
            while True:
                if y==len(mylist)-1 or mylist[y+1] > mylist[y]+10:
                    bottleneck = bottleneck + 1
                    duration = mylist[y] - mylist[x] + 1
                    tduration = tduration + duration
                    avg = tduration/bottleneck
                    x = y + 1
                    print("MY LIST Y = " , mylist[y])
                    print("Duration = " , duration)
                    break
                else:
                    y = y + 1
        else:
            x = x + 1
    return bottleneck, avg

# sample data frame to use
df = spark.createDataFrame(
    [
        [[10,11,19,20,21,22,23,24,25,33,45]],
        [[55,56,57,58,59,60,80,81,82,83,84,85,92,115]],
    ],
    ['col1',]
)

df.show()

+--------------------+
|                col1|
+--------------------+
|[10, 11, 19, 20, ...|
|[55, 56, 57, 58, ...|
+--------------------+

# convert values to int  --- edit
f_to_int = F.udf(lambda x: list(map(int, x)))
df = df.withColumn('col1', f_to_int('col1'))

# create udf
func = F.udf(lambda x: calculate(x), ArrayType(IntegerType()))

# apply udf
df = df.withColumn('vals', func('col1'))

# create new cols
df = df.select("col1", df.vals[0].alias('bottleneck'), df.vals[1].alias('avg'))

df.show()

+--------------------+----------+----+
|                col1|bottleneck| avg|
+--------------------+----------+----+
|[10, 11, 19, 20, ...|         1|null|
|[55, 56, 57, 58, ...|         2|null|
+--------------------+----------+----+

Upvotes: 1

Related Questions