Fasty
Fasty

Reputation: 804

How to compare values in a pyspark dataframe column with another dataframe in pyspark

I have a pyspark dataframe(df1) whose first first row is as below:

[Row(_c0='{"type":"Fi","values":[0.20100994408130646,1.172734797000885,0.06788740307092667,0.2314232587814331,0.2012220323085785]}', _c1='0')]

I want to compare the "values" list with the first column of below dataframe(df2) values as shown below:

0    0.57581    1.25461    0.68694    0.974580    1.54789    0.23646
1    0.98745    0.23655    2.58970    4.587580    0.89756    1.25678
2    0.45780    5.78940    0.65986    2.125400    0.98745    1.23658
3    2.56834    0.25698    4.26587    0.569872    0.36987    0.68975
4    0.25678    1.23654    5.68320    0.986230    0.87563    2.58975

Similarly I have many rows in df1, I have to see which values in df1 "values" list is greater than the corresponding column in df2.I need to find those indices which satisfy the above condition and store it as list in another column to df1.

For instance 1.172737 > 0.98745 so its index is 1.Hence I will have another column in df1 named(indices) which contains value1 and it has to append the same if another value comes up.

The comparison is between respective column and rows.The above shown df1 row is 1st row,so it has to compared with first column in df2.

If I have underemphasised sth please let me know in the comments.

Upvotes: 3

Views: 7576

Answers (1)

Pierre Gourseaud
Pierre Gourseaud

Reputation: 2477

This code works with Python 2.7 and Spark 2.3.2 :

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, IntegerType

# Create test dataframes
df1 = spark.createDataFrame([
        ['{"type":"Fi","values":[0.20100994408130646,1.172734797000885,0.06788740307092667,0.2314232587814331,0.2012220323085785]}', '0'],
        ['{"type":"Fi","values":[0.6, 0.8, 0.5, 2.1, 0.4]}', '0']
    ],['_c0','_c1'])
df2 = spark.createDataFrame([
        [0, 0.57581, 1.25461, 0.68694, 0.974580, 1.54789, 0.23646],
        [1, 0.98745, 0.23655, 2.58970, 4.587580, 0.89756, 1.25678],
        [2, 0.45780, 5.78940, 0.65986, 2.125400, 0.98745, 1.23658],
        [3, 2.56834, 0.25698, 4.26587, 0.569872, 0.36987, 0.68975],
        [4, 0.25678, 1.23654, 5.68320, 0.986230, 0.87563, 2.58975]
    ],['id','v1', 'v2', 'v3', 'v4', 'v5', 'v6'])

# Get schema and load json correctly
json_schema = spark.read.json(df1.rdd.map(lambda row: row._c0)).schema
df1 = df1.withColumn('json', F.from_json('_c0', json_schema))

# Get column 1 values to compare
values = [row['v1'] for row in df2.select('v1').collect()]

# Define udf to compare values
def cmp_values(lst):
    list_cmp = map(lambda t: t[0] > t[1], zip(lst, values))  # Boolean list
    return [idx for idx, cond in enumerate(list_cmp) if cond]  # Indices of satisfying elements

udf_cmp_values = F.udf(cmp_values, ArrayType(IntegerType()))

# Apply udf on array
df1 = df1.withColumn('indices', udf_cmp_values(df1.json['values']))
df1.show()

+--------------------+---+--------------------+---------+
|                 _c0|_c1|                json|  indices|
+--------------------+---+--------------------+---------+
|{"type":"Fi","val...|  0|[Fi, [0.201009944...|      [1]|
|{"type":"Fi","val...|  0|[Fi, [0.6, 0.8, 0...|[0, 2, 4]|
+--------------------+---+--------------------+---------+

Upvotes: 2

Related Questions