Paul
Paul

Reputation: 31

Function to find overlapping data in Spark DataFrame

I have the below DataFrame:

enter image description here

I am trying to find the passengers who have been on more than 3 trains together.

So for the example above, passengers with ID of 1 and 2 have been on the same train more than three times [2,3,4,6] and passengers with ID 4 and 5 have been on the same train more than three times [7,32,44,54]

Is there a scala function which could be written for this? I have tried an intersects type function but i cannot seem to apply it to the whole DataFrame.

Thank you for your help.

So for expected output, I was thinking it would return a DataFrame with the following:

enter image description here

I have a DataFrame with around 15,000 rows

Thank you

Upvotes: 2

Views: 952

Answers (3)

Emiliano Martinez
Emiliano Martinez

Reputation: 4133

If you have two arrays corresponding to the travels of two passengers like:

val passenger1Array = Array(1,2,3,4,5,6)
val passenger2Array = Array(2,3,4,6,7)

val schema = 
  StructType(
    List(StructField("passengerId", IntegerType), 
        StructField("TrainID", ArrayType(IntegerType))))

and build the dataframe:

val df = spark
   .createDataFrame(
     spark.sparkContext.parallelize(
        Seq(Row(1, passenger1Array), Row(2, passenger2Array))), schema)

/*
+-----------+------------------+
|passengerId|TrainID           |
+-----------+------------------+
|1          |[1, 2, 3, 4, 5, 6]|
|2          |[2, 3, 4, 6, 7]   |
+-----------+------------------+ 
*/

Now you can pivot your dataframe, if there are not thousands of trains:

val explDF = df.select(
    col("passengerId"),
    explode(col("TrainID")).as("TrainID")
  )

val _pivot =
  explDF.groupBy("passengerId")
    .pivot("TrainID").count().na.fill(0)

Having:

/*
+-----------+---+---+---+---+---+---+---+
|passengerId|  1|  2|  3|  4|  5|  6|  7|
+-----------+---+---+---+---+---+---+---+
|          1|  1|  1|  1|  1|  1|  1|  0|
|          2|  0|  1|  1|  1|  0|  1|  1|
+-----------+---+---+---+---+---+---+---+
*/

Transform trains columns into an array:

val trains = 
  ((passenger1Array ++ passenger2Array).distinct).map(el => el.toString).toSeq.map(col(_))

val resulting_df = 
   _pivot.withColumn("mask", array(trains: _*)).select("passengerId", "mask")

+-----------+---------------------+
|passengerId|mask                 |
+-----------+---------------------+
|1          |[1, 1, 1, 1, 1, 1, 0]|
|2          |[0, 1, 1, 1, 0, 1, 1]|
+-----------+---------------------+

Collect the arrays and build a dense matrix(it could be sparse):

import breeze.linalg._

val trains1 = 
resulting_df.rdd.map(row => (row.getAs[mutable.WrappedArray[Long]](1))).collect()

val matrix = DenseMatrix(trains1.toSeq:_*)

   /*
       1  1  1  1  1  1  0  
       0  1  1  1  0  1  1 
   */

Now do the dot product between the matrix and its transpose:

val resultM = matrix * matrix.t

/*
  6  4  
  4  5
*/

And as a result you will have a matrix whose elements correspond to the number of times that two passengers have been on the same train:

(1,1) 6 
(1,2) 4
(2,1) 4
(2,2) 5

Upvotes: 1

Ged
Ged

Reputation: 18043

Building on the 1st answer and providing a little extra perspective: you can can get grouping greater than 2 for the different combinations. Not sure what is req'd as question is a little vague.

val df = Seq(
  (1, Seq(1, 2, 3, 4, 6)),
  (2, Seq(2, 3, 4, 6, 7)),
  (3, Seq(1, 2, 5, 9, 100)),
  (4, Seq(11, 2, 4, 5, 7, 32, 44, 54)),(44, Seq(11, 2, 4, 5, 7, 32, 44, 654)),
  (5, Seq(7, 12, 34, 32, 44, 54)),
  (6, Seq(5, 21)) 
).toDF("passengerId", "trainId")

val df2 = df.as("d1").join(df.as("d2"), $"d1.passengerId" =!= $"d2.passengerId")
            .selectExpr("d1.passengerId as passengerId1", "d2.passengerId as passengerId2", "d1.trainId as trainId1", "d2.trainId as trainId2")
            .where("size(array_intersect(trainId1, trainId2)) > 3")
            .selectExpr("array_sort(array(passengerId1, passengerId2)) as ar", "trainId1")

val df3 = df2.withColumn("res", expr("explode(trainId1)"))
val df4 = df3.groupBy("trainId1").agg(collect_set("ar").as("res"))
val df5 = df4.withColumn("passengerIDs", array_distinct(sort_array(flatten($"res"))))
df5.show()

returns:

+--------------------+-----------------+------------+
|            trainId1|              res|passengerIDs|
+--------------------+-----------------+------------+
|     [1, 2, 3, 4, 6]|         [[1, 2]]|      [1, 2]|
|     [2, 3, 4, 6, 7]|         [[1, 2]]|      [1, 2]|
|[11, 2, 4, 5, 7, ...|[[4, 44], [4, 5]]|  [4, 5, 44]|
|[11, 2, 4, 5, 7, ...|        [[4, 44]]|     [4, 44]|
|[7, 12, 34, 32, 4...|         [[4, 5]]|      [4, 5]|
+--------------------+-----------------+------------+

which could be of use. It's actually quite a hard problem.

Upvotes: 1

abiratsis
abiratsis

Reputation: 7316

You can combine a self join with the array_intersect built-in function among others:

import org.apache.spark.sql.functions.expr

val df = Seq(
  (1, Seq(1, 2, 3, 4, 6)),
  (2, Seq(2, 3, 4, 6, 7)),
  (3, Seq(1, 2, 5, 9, 100)),
  (4, Seq(11, 2, 4, 5, 7, 32, 44, 54)),
  (5, Seq(7, 12, 34, 32, 44, 54)),
  (6, Seq(5, 21)) 
).toDF("passengerId", "trainId")

df.as("d1").join(df.as("d2"), $"d1.passengerId" =!= $"d2.passengerId")
            .selectExpr("d1.passengerId as passengerId1", "d2.passengerId as passengerId2", "d1.trainId as trainId1", "d2.trainId as trainId2")
            .where("size(array_intersect(trainId1, trainId2)) > 3")
            .selectExpr("array_sort(array(passengerId1, passengerId2)) as ar")
            .distinct()
            .selectExpr("ar[0] as usr1", "ar[1] as usr2")
            .show()

// +----+----+
// |usr1|usr2|
// +----+----+
// |1   |2   |
// |4   |5   |
// +----+----+

Upvotes: 1

Related Questions