martina.physics
martina.physics

Reputation: 9804

Joining PySpark DataFrames on nested field

I want to perform a join between these two PySpark DataFrames:

from pyspark import SparkContext
from pyspark.sql.functions import col

sc = SparkContext()

df1 = sc.parallelize([
    ['owner1', 'obj1', 0.5],
    ['owner1', 'obj1', 0.2],
    ['owner2', 'obj2', 0.1]
]).toDF(('owner', 'object', 'score'))

df2 = sc.parallelize(
          [Row(owner=u'owner1',
           objects=[Row(name=u'obj1', value=Row(fav=True, ratio=0.3))])]).toDF()

The join has to be performed on the name of the object, namely the field name inside objects for df2 and object for df1.

I am able to perform a SELECT on the nested field, as in

df2.where(df2.owner == 'owner1').select(col("objects.value.ratio")).show()

but I am not able to run this join:

df2.alias('u').join(df1.alias('s'), col('u.objects.name') == col('s.object'))

which returns error

pyspark.sql.utils.AnalysisException: u"cannot resolve '(objects.name = cast(object as double))' due to data type mismatch: differing types in '(objects.name = cast(object as double))' (array and double).;"

Any ideas how to solve this?

Upvotes: 6

Views: 2900

Answers (1)

zero323
zero323

Reputation: 330183

Since you want to match and extract specific element the simplest approach is to explode the row:

matches = df2.withColumn("object", explode(col("objects"))).alias("u").join(
  df1.alias("s"),
  col("s.object") == col("u.object.name")
)

matches.show()
## +-------------------+------+-----------------+------+------+-----+
## |            objects| owner|           object| owner|object|score|
## +-------------------+------+-----------------+------+------+-----+
## |[[obj1,[true,0.3]]]|owner1|[obj1,[true,0.3]]|owner1|  obj1|  0.5|
## |[[obj1,[true,0.3]]]|owner1|[obj1,[true,0.3]]|owner1|  obj1|  0.2|
## +-------------------+------+-----------------+------+------+-----+

Alternative, but very inefficient approach is to use array_contains:

matches_contains = df1.alias("s").join(
  df2.alias("u"), expr("array_contains(objects.name, object)"))

It is ineffective because it will be expanded to Cartesian product:

matches_contains.explain()
## == Physical Plan ==
## Filter array_contains(objects#6.name,object#4)
## +- CartesianProduct
##    :- Scan ExistingRDD[owner#3,object#4,score#5] 
##    +- Scan ExistingRDD[objects#6,owner#7]

If size of the array is relatively small it is possible to generate optimized version of array_contains as I've shown here: Filter by whether column value equals a list in spark

Upvotes: 9

Related Questions