Reputation: 31
I have the below DataFrame:
I am trying to find the passengers who have been on more than 3 trains together.
So for the example above, passengers with ID of 1 and 2 have been on the same train more than three times [2,3,4,6] and passengers with ID 4 and 5 have been on the same train more than three times [7,32,44,54]
Is there a scala function which could be written for this? I have tried an intersects type function but i cannot seem to apply it to the whole DataFrame.
Thank you for your help.
So for expected output, I was thinking it would return a DataFrame with the following:
I have a DataFrame with around 15,000 rows
Thank you
Upvotes: 2
Views: 952
Reputation: 4133
If you have two arrays corresponding to the travels of two passengers like:
val passenger1Array = Array(1,2,3,4,5,6)
val passenger2Array = Array(2,3,4,6,7)
val schema =
StructType(
List(StructField("passengerId", IntegerType),
StructField("TrainID", ArrayType(IntegerType))))
and build the dataframe:
val df = spark
.createDataFrame(
spark.sparkContext.parallelize(
Seq(Row(1, passenger1Array), Row(2, passenger2Array))), schema)
/*
+-----------+------------------+
|passengerId|TrainID |
+-----------+------------------+
|1 |[1, 2, 3, 4, 5, 6]|
|2 |[2, 3, 4, 6, 7] |
+-----------+------------------+
*/
Now you can pivot your dataframe, if there are not thousands of trains:
val explDF = df.select(
col("passengerId"),
explode(col("TrainID")).as("TrainID")
)
val _pivot =
explDF.groupBy("passengerId")
.pivot("TrainID").count().na.fill(0)
Having:
/*
+-----------+---+---+---+---+---+---+---+
|passengerId| 1| 2| 3| 4| 5| 6| 7|
+-----------+---+---+---+---+---+---+---+
| 1| 1| 1| 1| 1| 1| 1| 0|
| 2| 0| 1| 1| 1| 0| 1| 1|
+-----------+---+---+---+---+---+---+---+
*/
Transform trains columns into an array:
val trains =
((passenger1Array ++ passenger2Array).distinct).map(el => el.toString).toSeq.map(col(_))
val resulting_df =
_pivot.withColumn("mask", array(trains: _*)).select("passengerId", "mask")
+-----------+---------------------+
|passengerId|mask |
+-----------+---------------------+
|1 |[1, 1, 1, 1, 1, 1, 0]|
|2 |[0, 1, 1, 1, 0, 1, 1]|
+-----------+---------------------+
Collect the arrays and build a dense matrix(it could be sparse):
import breeze.linalg._
val trains1 =
resulting_df.rdd.map(row => (row.getAs[mutable.WrappedArray[Long]](1))).collect()
val matrix = DenseMatrix(trains1.toSeq:_*)
/*
1 1 1 1 1 1 0
0 1 1 1 0 1 1
*/
Now do the dot product between the matrix and its transpose:
val resultM = matrix * matrix.t
/*
6 4
4 5
*/
And as a result you will have a matrix whose elements correspond to the number of times that two passengers have been on the same train:
(1,1) 6
(1,2) 4
(2,1) 4
(2,2) 5
Upvotes: 1
Reputation: 18043
Building on the 1st answer and providing a little extra perspective: you can can get grouping greater than 2 for the different combinations. Not sure what is req'd as question is a little vague.
val df = Seq(
(1, Seq(1, 2, 3, 4, 6)),
(2, Seq(2, 3, 4, 6, 7)),
(3, Seq(1, 2, 5, 9, 100)),
(4, Seq(11, 2, 4, 5, 7, 32, 44, 54)),(44, Seq(11, 2, 4, 5, 7, 32, 44, 654)),
(5, Seq(7, 12, 34, 32, 44, 54)),
(6, Seq(5, 21))
).toDF("passengerId", "trainId")
val df2 = df.as("d1").join(df.as("d2"), $"d1.passengerId" =!= $"d2.passengerId")
.selectExpr("d1.passengerId as passengerId1", "d2.passengerId as passengerId2", "d1.trainId as trainId1", "d2.trainId as trainId2")
.where("size(array_intersect(trainId1, trainId2)) > 3")
.selectExpr("array_sort(array(passengerId1, passengerId2)) as ar", "trainId1")
val df3 = df2.withColumn("res", expr("explode(trainId1)"))
val df4 = df3.groupBy("trainId1").agg(collect_set("ar").as("res"))
val df5 = df4.withColumn("passengerIDs", array_distinct(sort_array(flatten($"res"))))
df5.show()
returns:
+--------------------+-----------------+------------+
| trainId1| res|passengerIDs|
+--------------------+-----------------+------------+
| [1, 2, 3, 4, 6]| [[1, 2]]| [1, 2]|
| [2, 3, 4, 6, 7]| [[1, 2]]| [1, 2]|
|[11, 2, 4, 5, 7, ...|[[4, 44], [4, 5]]| [4, 5, 44]|
|[11, 2, 4, 5, 7, ...| [[4, 44]]| [4, 44]|
|[7, 12, 34, 32, 4...| [[4, 5]]| [4, 5]|
+--------------------+-----------------+------------+
which could be of use. It's actually quite a hard problem.
Upvotes: 1
Reputation: 7316
You can combine a self join with the array_intersect
built-in function among others:
import org.apache.spark.sql.functions.expr
val df = Seq(
(1, Seq(1, 2, 3, 4, 6)),
(2, Seq(2, 3, 4, 6, 7)),
(3, Seq(1, 2, 5, 9, 100)),
(4, Seq(11, 2, 4, 5, 7, 32, 44, 54)),
(5, Seq(7, 12, 34, 32, 44, 54)),
(6, Seq(5, 21))
).toDF("passengerId", "trainId")
df.as("d1").join(df.as("d2"), $"d1.passengerId" =!= $"d2.passengerId")
.selectExpr("d1.passengerId as passengerId1", "d2.passengerId as passengerId2", "d1.trainId as trainId1", "d2.trainId as trainId2")
.where("size(array_intersect(trainId1, trainId2)) > 3")
.selectExpr("array_sort(array(passengerId1, passengerId2)) as ar")
.distinct()
.selectExpr("ar[0] as usr1", "ar[1] as usr2")
.show()
// +----+----+
// |usr1|usr2|
// +----+----+
// |1 |2 |
// |4 |5 |
// +----+----+
Upvotes: 1