Reputation: 3091
I created a function to generate random values in a given range for DF columns.
test_df = self.spark.createDataFrame([(1, 'metric1', 10.5), (2, 'metric2', 20.7), (3, 'metric3', 30.1)], ['id', 'metric', 'score'])
def generate_rand_value(col: Column) -> Column:
lower = col - (col * RANGE)
upper = col + (col * RANGE)
return random.uniform(lower, upper)
Then I decided to modify it to generate a fixed number for each column value in a range using seed
:
def generate_fixed_rand_value(column: Column):
random.seed(5)
return random.randint(column, 10)
This results in an error: TypeError: int() argument must be a string, a bytes-like object or a number, not 'Column'
. What will be the right way to call the function for generating fixed float numbers for each column value? Or maybe there is a more suitable approach for that?
I call the function like this:
def parse_cols(df, cols: list):
for col_name in cols:
df = df.withColumn(col_name, generate_fixed_rand_value(F.col(col_name)))
return df
Upvotes: 1
Views: 2016
Reputation: 1712
I think you got lucky with random.uniform because the implementation in python is such that the operands are used inline with pyspark syntax. Refer below from the source code - https://github.com/python/cpython/blob/3.8/Lib/random.py
def uniform(self, a, b):
"Get a random number in the range [a, b) or [a, b] depending on rounding."
return a + (b-a) * self.random()
so when your use random.uniform inside withColumn , for example test=df_in.withColumn('tst',random.uniform(col('x'),col('y'))
, it translates to df_in.withColumn('tst',(col('x')+(col('x')-col('y'))*<some_randomnumber>)
which is an allowed syntax in pyspark
But for randitnt, somehow it ends up executing this function
def randrange(self, start, stop=None, step=1, _int=int):
"""Choose a random item from range(start, stop[, step]).
This fixes the problem with randint() which includes the
endpoint; in Python this is usually not what you want.
"""
# This code is a bit messy to make it fast for the
# common case while still doing adequate error checking.
istart = _int(start)
if istart != start:
raise ValueError("non-integer arg 1 for randrange()")
As you can see, there is a check to ensure that the arguments are integer, hence you get an error. However when you make your function a udf, then it is executed row by row, hence only integers are passed and the function works.
So in your function generate_fixed_rand_value - you can replace randint() with uniform() function and it will work. Just as a hint - you can also use uniform() directly in withColumn() and a seperate function is not needed
import pyspark.sql.functions as F
import random
random.seed(5)
tst= sqlContext.createDataFrame([(10,7.5,14),(5,1.2,4),(9,7.5,10),(2,1.2,90),(10,2.0,30),(3,5.5,11)],schema=['val1','x','y'])
tst_res1 = tst.withColumn("random_number",random.uniform(F.col('x'),10))
results:
+----+---+---+-----------------+
|val1| x| y| random_number|
+----+---+---+-----------------+
| 10|7.5| 14|9.057254237224255|
| 5|1.2| 4|6.681534915029378|
| 9|7.5| 10|9.057254237224255|
| 2|1.2| 90|6.681534915029378|
| 10|2.0| 30|6.983213559117615|
| 3|5.5| 11|8.303057627003659|
+----+---+---+-----------------+
There are fixed random numbers per value in the column x. If you need it for other columns, you can use a for loop or use a reduce function.
Incase you need a random number between a percentage of a particular column replace the last line with this:
tst_res1 = tst.withColumn("random_number",random.uniform((F.col('x')-(0.25*F.col('x'))),(F.col('x')+(0.25*F.col('x')))))The results:
+----+---+---+------------------+
|val1| x| y| random_number|
+----+---+---+------------------+
| 10|7.5| 14| 8.66794067791819|
| 5|1.2| 4|1.3868705084669106|
| 9|7.5| 10| 8.66794067791819|
| 2|1.2| 90|1.3868705084669106|
| 10|2.0| 30| 2.311450847444851|
| 3|5.5| 11| 6.35648983047334|
+----+---+---+------------------+
Upvotes: 2