Reputation: 65
EDIT. TL;DR: I'm trying to achieve a nested loop in a pyspark Dataframe. As you may see,I want the nested loop to start from the NEXT row (in respect to the first loop) in every iteration, so as to reduce unneccesary iterations. Using Python , I can use [row.Index+1:] . Generally, in plain Python I can achieve that with the next code:
r=1000
joins=[]
for row in df.itertuples():
for k in df[row.Index+1:].itertuples():
a = np.array((row.x, row.y))
b = np.array((k.x, k.y))
if row.class!= k.class:
if row.x < k.x+r:
if np.linalg.norm(a-b) < r:
joins.append((row.name, k.name))
print(joins)
How I can achieve the same result in Spark?
Long problem analysis: I have a list
people=[('john', 35, 54, 'A'), ('george', 94, 84, 'B'), ('nicolas', 7, 9, 'B'), ('tom', 86, 93, 'A'), ('jason', 62, 73, 'B'), ('bill', 15, 58, 'A'), ('william', 9, 3, 'A'), ('brad', 73, 37, 'B'), ('cosmo', 52, 67, 'B'), ('jerry', 73, 30, 'A')]
and I convert it to a spark Dataframe which I sort it by 'y', descending:
schema = StructType([
StructField('name', StringType(), True),
StructField('x', IntegerType(), True),
StructField('y', IntegerType(), True),
StructField('class', StringType(), True),
])
rdd = spark.sparkContext.parallelize(people)
df = spark.createDataFrame(rdd,schema)
df.orderBy('y',ascending=False).show()
+-------+---+---+-----+
| name| x| y|class|
+-------+---+---+-----+
| tom| 86| 93| A|
| george| 94| 84| B|
| jason| 62| 73| B|
| cosmo| 52| 67| B|
| bill| 15| 58| A|
| john| 35| 54| A|
| brad| 73| 37| B|
| jerry| 73| 30| A|
|nicolas| 7| 9| B|
|william| 9| 3| A|
+-------+---+---+-----+
What I 'm looking for is to Join 'class A' names with 'class B' names, when a condition is fulfilled. Let's say for example , when Euclidean distance (A item - B item) < 10. I believe that cross join is not the best idea because it could be time consuming with large datasets.
I think it would be nice if I could iterate. pseudocode:
start with row1
if row1[class] != row2[class]:
if row1['y'] - row2['y'] < 10:
if Euclidean distance(row1.item - row2.item) < 10:
join row1.name, row2.name
end
else break
it keeps iterating until row1['y'] - row['y'] >= 10
then a new iteration starts from row2
if row2[class] != row3[class]:
etc etc
In that way, there will be fewer checks between different rows. Which is opposite to cross join where everything is checked against everything resulting in larger execution times. But how can this be done in spark? Any ideas?
edit. Required output (in any form):
brad, jerry
(The only pair that its items belong to different classes & their euclidean distance is 7, which is lower than 10)
Upvotes: 0
Views: 5271
Reputation: 2752
If I'm understanding the problem correctly, you could split the dataframe into two dataframes based on the class
column, then join them based on specified the join clause (using outer join):
from pyspark.sql.functions import col, collect_list, struct
A_df = df.where(col('class') == 'A').withColumnRenamed('name', 'A.name')
B_df = df.where(col('class') == 'B').withColumnRenamed('name', 'B.name')
join_clause = A_df.y - B_df.y <= 10
result = A_df.join(B_df, join_clause, 'outer')
And with the result dataframe, convert the two columns to one list column:
result = result.withColumn(collect_list(struct(col('A.name'), col('B.name')))
Update
Here's an implementation of something using mapPartitions
, no joins or converting to DataFrame:
import math
from pyspark.sql import SparkSession
def process_data(rows):
r = 1000
joins = []
for row1 in rows:
for row2 in rows:
if row1['class'] != row2['class']:
if row1['x'] < row2['x'] + r:
if math.sqrt((row1['x'] - row2['x']) ** 2 + (row1['y'] - row2['y']) ** 2) <= r:
joins.append((row1['name'], row2['name']))
return joins
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
people = [('john', 35, 54, 'A'),
('george', 94, 84, 'B'),
('nicolas', 7, 9, 'B'),
('tom', 86, 93, 'A'),
('jason', 62, 73, 'B'),
('bill', 15, 58, 'A'),
('william', 9, 3, 'A'),
('brad', 73, 37, 'B'),
('cosmo', 52, 67, 'B'),
('jerry', 73, 30, 'A')]
fields = ('name', 'x', 'y', 'class')
data = [dict(zip(fields, person)) for person in people]
rdd = spark.sparkContext.parallelize(data)
result = rdd.mapPartitions(process_data).collect()
print(result)
Output:
[('tom', 'jason'), ('cosmo', 'jerry')]
Update 2
Added an initial sort step on the 'y' field, repartition to ensure all data is on one partition (so all records can be compared), and changed the nested loop:
import math
from pyspark.sql import SparkSession
def process_data(rows):
r = 1000
joins = []
rows = list(rows)
for i, row1 in enumerate(rows):
for row2 in rows[i:]:
if row1['class'] != row2['class']:
if row1['x'] < row2['x'] + r:
if math.sqrt((row1['x'] - row2['x']) ** 2 + (row1['y'] - row2['y']) ** 2) < r:
joins.append((row1['name'], row2['name']))
return joins
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
people = [('john', 35, 54, 'A'),
('george', 94, 84, 'B'),
('nicolas', 7, 9, 'B'),
('tom', 86, 93, 'A'),
('jason', 62, 73, 'B'),
('bill', 15, 58, 'A'),
('william', 9, 3, 'A'),
('brad', 73, 37, 'B'),
('cosmo', 52, 67, 'B'),
('jerry', 73, 30, 'A')]
fields = ('name', 'x', 'y', 'class')
data = [dict(zip(fields, person)) for person in people]
rdd = spark.sparkContext.parallelize(data)
result = rdd.sortBy(lambda x: x['y'], ascending=False).repartition(1).mapPartitions(process_data).collect()
print(result)
Output:
[('william', 'nicolas'), ('william', 'brad'), ('william', 'cosmo'), ('william', 'jason'), ('william', 'george'), ('nicolas', 'jerry'), ('nicolas', 'john'), ('nicolas', 'bill'), ('nicolas', 'tom'), ('jerry', 'brad'), ('jerry', 'cosmo'), ('jerry', 'jason'), ('jerry', 'george'), ('brad', 'john'), ('brad', 'bill'), ('brad', 'tom'), ('john', 'cosmo'), ('john', 'jason'), ('john', 'george'), ('bill', 'cosmo'), ('bill', 'jason'), ('bill', 'george'), ('cosmo', 'tom'), ('jason', 'tom'), ('george', 'tom')]
Upvotes: 2