Reputation: 69
I wrote an algorithm that does something and prints the output. The input for my algorithm is a list with some integers. Here is the sample input as a list.
`mylist = [5,6,14,15,16,17,18,19,20,28,40,41,42,43,44,55]`
and here is my algorithm
```
tduration = 0
duration = 0
avg = 0
bottleneck = 0
y = 0
x = 0
while x<len(mylist)-4 and y<len(mylist)-1 :
if mylist[x+4] == mylist[x]+4:
y = x + 4
print("MY LIST X = ",mylist[x])
print("X = ", x)
print ("Y = ", y)
while True:
if y==len(mylist)-1 or mylist[y+1] > mylist[y]+10:
bottleneck = bottleneck + 1
duration = mylist[y] - mylist[x] + 1
tduration = tduration + duration
avg = tduration/bottleneck
x = y + 1
print("MY LIST Y = " , mylist[y])
print("Duration = " , duration)
break
else:
y = y + 1
else:
x = x + 1
print("BottleneckCount = ", bottleneck, "\nAverageDuration = ", avg)
```
Now I want to transform this "Algorithm" to a User Defined Function (UDF) in PySpark and then apply this UDF to a DataFrame with one column. There is one list in each row of this DataFrame. Sample DataFrame has 1 column and 2 rows. row1 is a list of [10,11,19,20,21,22,23,24,25,33,45]
and row2 is a list of [55,56,57,58,59,60,80,81,82,83,84,85,92,115]
so the UDF should be applied to each row of DataFrame separately and give the results for each row in another column.
Thank you in advance for your time and help. I will upvote your answers
Upvotes: 2
Views: 493
Reputation: 69
YOLO answered this question and it is a complete answer. The only problem is that in the last column for "avg", we are getting NULL values. I realized that I can solve this problem by using this "func" instead of that "func" in YOLO's answer.
import pyspark.sql.types as T
func = F.udf(lambda x: calculate(x), T.StructType(
[T.StructField("val1", T.IntegerType(), True),
T.StructField("val2", T.FloatType(), True)]))
Upvotes: 0
Reputation: 21739
Here's a way you can do:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, ArrayType
def calculate(mylist):
tduration = 0
duration = 0
avg = 0
bottleneck = 0
y = 0
x = 0
while x<len(mylist)-4 and y<len(mylist)-1 :
if mylist[x+4] == mylist[x]+4:
y = x + 4
print("MY LIST X = ",mylist[x])
print("X = ", x)
print ("Y = ", y)
while True:
if y==len(mylist)-1 or mylist[y+1] > mylist[y]+10:
bottleneck = bottleneck + 1
duration = mylist[y] - mylist[x] + 1
tduration = tduration + duration
avg = tduration/bottleneck
x = y + 1
print("MY LIST Y = " , mylist[y])
print("Duration = " , duration)
break
else:
y = y + 1
else:
x = x + 1
return bottleneck, avg
# sample data frame to use
df = spark.createDataFrame(
[
[[10,11,19,20,21,22,23,24,25,33,45]],
[[55,56,57,58,59,60,80,81,82,83,84,85,92,115]],
],
['col1',]
)
df.show()
+--------------------+
| col1|
+--------------------+
|[10, 11, 19, 20, ...|
|[55, 56, 57, 58, ...|
+--------------------+
# convert values to int --- edit
f_to_int = F.udf(lambda x: list(map(int, x)))
df = df.withColumn('col1', f_to_int('col1'))
# create udf
func = F.udf(lambda x: calculate(x), ArrayType(IntegerType()))
# apply udf
df = df.withColumn('vals', func('col1'))
# create new cols
df = df.select("col1", df.vals[0].alias('bottleneck'), df.vals[1].alias('avg'))
df.show()
+--------------------+----------+----+
| col1|bottleneck| avg|
+--------------------+----------+----+
|[10, 11, 19, 20, ...| 1|null|
|[55, 56, 57, 58, ...| 2|null|
+--------------------+----------+----+
Upvotes: 1