Ian_Lane
Ian_Lane

Reputation: 65

Pyspark nested loop in the same DataFrame. How to iterate?

EDIT. TL;DR: I'm trying to achieve a nested loop in a pyspark Dataframe. As you may see,I want the nested loop to start from the NEXT row (in respect to the first loop) in every iteration, so as to reduce unneccesary iterations. Using Python , I can use [row.Index+1:] . Generally, in plain Python I can achieve that with the next code:

r=1000
joins=[]
for row in df.itertuples():
    for k in df[row.Index+1:].itertuples():
        a = np.array((row.x, row.y))
        b = np.array((k.x, k.y))
        if row.class!= k.class:
            if row.x < k.x+r:
                if np.linalg.norm(a-b) < r:
                    joins.append((row.name, k.name))
print(joins)

How I can achieve the same result in Spark?

Long problem analysis: I have a list

people=[('john', 35, 54, 'A'), ('george', 94, 84, 'B'), ('nicolas', 7, 9, 'B'), ('tom', 86, 93, 'A'), ('jason', 62, 73, 'B'), ('bill', 15, 58, 'A'), ('william', 9, 3, 'A'), ('brad', 73, 37, 'B'), ('cosmo', 52, 67, 'B'), ('jerry', 73, 30, 'A')]

and I convert it to a spark Dataframe which I sort it by 'y', descending:

schema = StructType([
    StructField('name', StringType(), True),
    StructField('x', IntegerType(), True),
    StructField('y', IntegerType(), True),
    StructField('class', StringType(), True),
])
rdd = spark.sparkContext.parallelize(people)
df = spark.createDataFrame(rdd,schema)
df.orderBy('y',ascending=False).show()
+-------+---+---+-----+
|   name|  x|  y|class|
+-------+---+---+-----+
|    tom| 86| 93|    A|
| george| 94| 84|    B|
|  jason| 62| 73|    B|
|  cosmo| 52| 67|    B|
|   bill| 15| 58|    A|
|   john| 35| 54|    A|
|   brad| 73| 37|    B|
|  jerry| 73| 30|    A|
|nicolas|  7|  9|    B|
|william|  9|  3|    A|
+-------+---+---+-----+

What I 'm looking for is to Join 'class A' names with 'class B' names, when a condition is fulfilled. Let's say for example , when Euclidean distance (A item - B item) < 10. I believe that cross join is not the best idea because it could be time consuming with large datasets.

I think it would be nice if I could iterate. pseudocode:

start with row1
if row1[class] != row2[class]:
    if row1['y'] - row2['y'] < 10:
        if Euclidean distance(row1.item - row2.item) < 10:
            join row1.name, row2.name
        end
    else break

it keeps iterating until row1['y'] - row['y'] >= 10

then a new iteration starts from row2
if row2[class] != row3[class]:
    etc etc

In that way, there will be fewer checks between different rows. Which is opposite to cross join where everything is checked against everything resulting in larger execution times. But how can this be done in spark? Any ideas?

edit. Required output (in any form):

brad, jerry

(The only pair that its items belong to different classes & their euclidean distance is 7, which is lower than 10)

Upvotes: 0

Views: 5271

Answers (1)

danielcahall
danielcahall

Reputation: 2752

If I'm understanding the problem correctly, you could split the dataframe into two dataframes based on the class column, then join them based on specified the join clause (using outer join):

from pyspark.sql.functions import col, collect_list, struct
A_df = df.where(col('class') == 'A').withColumnRenamed('name', 'A.name')
B_df = df.where(col('class') == 'B').withColumnRenamed('name', 'B.name')

join_clause = A_df.y - B_df.y <= 10
result = A_df.join(B_df, join_clause, 'outer')

And with the result dataframe, convert the two columns to one list column:

result = result.withColumn(collect_list(struct(col('A.name'), col('B.name')))

Update

Here's an implementation of something using mapPartitions, no joins or converting to DataFrame:

import math

from pyspark.sql import SparkSession


def process_data(rows):
    r = 1000
    joins = []
    for row1 in rows:
        for row2 in rows:
            if row1['class'] != row2['class']:
                if row1['x'] < row2['x'] + r:
                    if math.sqrt((row1['x'] - row2['x']) ** 2 + (row1['y'] - row2['y']) ** 2) <= r:
                        joins.append((row1['name'], row2['name']))
    return joins


spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

people = [('john', 35, 54, 'A'),
          ('george', 94, 84, 'B'),
          ('nicolas', 7, 9, 'B'),
          ('tom', 86, 93, 'A'),
          ('jason', 62, 73, 'B'),
          ('bill', 15, 58, 'A'),
          ('william', 9, 3, 'A'),
          ('brad', 73, 37, 'B'),
          ('cosmo', 52, 67, 'B'),
          ('jerry', 73, 30, 'A')]

fields = ('name', 'x', 'y', 'class')

data = [dict(zip(fields, person)) for person in people]

rdd = spark.sparkContext.parallelize(data)

result = rdd.mapPartitions(process_data).collect()
print(result)

Output:

[('tom', 'jason'), ('cosmo', 'jerry')]

Update 2

Added an initial sort step on the 'y' field, repartition to ensure all data is on one partition (so all records can be compared), and changed the nested loop:

import math

from pyspark.sql import SparkSession


def process_data(rows):
    r = 1000
    joins = []
    rows = list(rows)
    for i, row1 in enumerate(rows):
        for row2 in rows[i:]:
            if row1['class'] != row2['class']:
                if row1['x'] < row2['x'] + r:
                    if math.sqrt((row1['x'] - row2['x']) ** 2 + (row1['y'] - row2['y']) ** 2) < r:
                        joins.append((row1['name'], row2['name']))
    return joins


spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

people = [('john', 35, 54, 'A'),
          ('george', 94, 84, 'B'),
          ('nicolas', 7, 9, 'B'),
          ('tom', 86, 93, 'A'),
          ('jason', 62, 73, 'B'),
          ('bill', 15, 58, 'A'),
          ('william', 9, 3, 'A'),
          ('brad', 73, 37, 'B'),
          ('cosmo', 52, 67, 'B'),
          ('jerry', 73, 30, 'A')]

fields = ('name', 'x', 'y', 'class')

data = [dict(zip(fields, person)) for person in people]

rdd = spark.sparkContext.parallelize(data)

result = rdd.sortBy(lambda x: x['y'], ascending=False).repartition(1).mapPartitions(process_data).collect()
print(result)

Output:

[('william', 'nicolas'), ('william', 'brad'), ('william', 'cosmo'), ('william', 'jason'), ('william', 'george'), ('nicolas', 'jerry'), ('nicolas', 'john'), ('nicolas', 'bill'), ('nicolas', 'tom'), ('jerry', 'brad'), ('jerry', 'cosmo'), ('jerry', 'jason'), ('jerry', 'george'), ('brad', 'john'), ('brad', 'bill'), ('brad', 'tom'), ('john', 'cosmo'), ('john', 'jason'), ('john', 'george'), ('bill', 'cosmo'), ('bill', 'jason'), ('bill', 'george'), ('cosmo', 'tom'), ('jason', 'tom'), ('george', 'tom')]

Upvotes: 2

Related Questions