Reputation: 1114
I have and n x m DataFrame and a 1 x m DataFrame
df1=sc.parallelize([('a1',.5,.27),('a2',.15,.40),('a3',.7,.05)]).toDF(['id1','x1', 'x2'])
+---+----+----+
|id1| x1| x2|
+---+----+----+
| a1| 0.5|0.27|
| a2|0.15| 0.4|
| a3| 0.7|0.05|
+---+----+----+
df2=sc.parallelize([(.4,.3)]).toDF(['w1','w2'])
+---+---+
| w1| w2|
+---+---+
|0.4|0.3|
+---+---+
I would like to perform a boolean operation comparing column x1 in df1 to column w1 in df2 and column x2 in df2 to column w2 in df2. I would like the result to return a DataFrame in which the first column is 'id1' in df1.
I want my result to look like this
+---+---+---+
|id1| x1| x2|
+---+---+---+
| a1| 1| 0|
| a2| 0| 1|
| a3| 1| 0|
+---+---+---+
All I have right now is
rd=df1.rdd
rd_list=df2.rdd.collect()
def function_1(x):
bool_1=int(x[1]>rd_list[0][0])
bool_2=int(x[2]>rd_list[0][1])
return (x[0],bool_1,bool_2)
rd.map(function_1).toDF().show()
+---+---+---+
|id1| x1| x2|
+---+---+---+
| a1| 1| 0|
| a2| 0| 1|
| a3| 1| 0|
+---+---+---+
this gets me my result, but there must be a better way.
Upvotes: 1
Views: 636
Reputation: 330283
You can either compare with literals (using single row data frame doesn't make much sense):
from pyspark.sql.functions import col
w1, w2 = df2.first()
df1.select(
"id1",
(col("x1") > w1).cast("integer"),
(col("x2") > w2).cast("integer")
).toDF("id1", "w1", "w2")
or apply cross product and select
:
from pyspark.sql.functions import broadcast
df1.crossJoin(broadcast(df2)).select(
"id1",
(col("x1") > col("w1")).cast("integer"),
(col("x2") > col("w2")).cast("integer")
).toDF("id1", "x1", "x2")
If you use Spark 2.0 or earlier plain join
instead of crossJoin
should have the same effect when combined with broadcast
.
Upvotes: 2