Reputation: 134
I have the following RDDs:
JavaPairRDD<List<String>, String> firstRDD = ...
firstRDD.foreach(row -> System.out.println(row._1() + ", " + row._2()));
// [Man, Parent], Father
JavaPairRDD<List<String>, String> secondRDD = ...
secondRDD.foreach(row -> System.out.println(row._1() + ", " + row._2()));
// [Man, Parent, Father], Person
I want to perform an inner join, so that one row is equals to another row if the left key is IN (i.e., sublist of) the right key (in the former example, [Man, Parent]
is in [Man, Parent, Father]
).
Any suggestions?
Thanks!
Upvotes: 1
Views: 485
Reputation: 14845
For RDDs (and also for JavaPairRDDs) the join operation(s) can only check for exactly matching keys.
Therefore we have to transform the RDDs into Dataframes:
public static Dataset<Row> toDataframe(SparkSession spark, JavaPairRDD<List<String>, String> rdd) {
JavaRDD<Row> rowRDD1 = rdd.map(tuple -> {
Seq<String> key = JavaConverters.asScalaIteratorConverter(tuple._1().iterator()).asScala().toSeq();
return RowFactory.create(key, tuple._2());
});
StructType st = new StructType()
.add(new StructField("key", DataTypes.createArrayType(DataTypes.StringType), true, new MetadataBuilder().build()))
.add(new StructField("value", DataTypes.StringType, true, new MetadataBuilder().build()));
return spark.createDataFrame(rowRDD1, st);
}
For the join criteria, we need a UDF to check if one array is part of the other. If the order of the elements is not important, array_intersect could also be used.
UserDefinedFunction contains = functions.udf((Seq<String> a, Seq<String> b) -> b.containsSlice(a), DataTypes.BooleanType);
Putting these two elements together, we get
Dataset<Row> df1 = toDataframe(spark, firstRDD);
Dataset<Row> df2 = toDataframe(spark, secondRDD);
Dataset<Row> result = df1.join(df2,contains.apply(df1.col("key"), df2.col("key")));
With the input data
firstRDD secondRDD
+------+-----+ +------------+-----+
| key|value| | key|value|
+------+-----+ +------------+-----+
|[a, b]| A| | [a, b, c]| C|
|[b, a]| B| |[a, b, c, d]| D|
+------+-----+ +------------+-----+
we get
+------+-----+------------+-----+
| key|value| key|value|
+------+-----+------------+-----+
|[a, b]| A| [a, b, c]| C|
|[a, b]| A|[a, b, c, d]| D|
+------+-----+------------+-----+
Please not that using an UDF as join criteria might not be the fastest option.
Upvotes: 1