benhid
benhid

Reputation: 134

Spark RDDs join operation with lists

I have the following RDDs:

JavaPairRDD<List<String>, String> firstRDD = ...
firstRDD.foreach(row -> System.out.println(row._1() + ", " + row._2()));
// [Man, Parent], Father

JavaPairRDD<List<String>, String> secondRDD = ...
secondRDD.foreach(row -> System.out.println(row._1() + ", " + row._2()));
// [Man, Parent, Father], Person

I want to perform an inner join, so that one row is equals to another row if the left key is IN (i.e., sublist of) the right key (in the former example, [Man, Parent] is in [Man, Parent, Father]).

Any suggestions?

Thanks!

Upvotes: 1

Views: 485

Answers (1)

werner
werner

Reputation: 14845

For RDDs (and also for JavaPairRDDs) the join operation(s) can only check for exactly matching keys.

Therefore we have to transform the RDDs into Dataframes:

public static Dataset<Row> toDataframe(SparkSession spark, JavaPairRDD<List<String>, String> rdd) {
    JavaRDD<Row> rowRDD1 = rdd.map(tuple -> {
        Seq<String> key = JavaConverters.asScalaIteratorConverter(tuple._1().iterator()).asScala().toSeq();
        return RowFactory.create(key, tuple._2());
    });
    StructType st = new StructType()
            .add(new StructField("key", DataTypes.createArrayType(DataTypes.StringType), true, new MetadataBuilder().build()))
            .add(new StructField("value", DataTypes.StringType, true, new MetadataBuilder().build()));
    return spark.createDataFrame(rowRDD1, st);
}

For the join criteria, we need a UDF to check if one array is part of the other. If the order of the elements is not important, array_intersect could also be used.

UserDefinedFunction contains = functions.udf((Seq<String> a, Seq<String> b) -> b.containsSlice(a), DataTypes.BooleanType);

Putting these two elements together, we get

Dataset<Row> df1 = toDataframe(spark, firstRDD);
Dataset<Row> df2 = toDataframe(spark, secondRDD);
Dataset<Row> result = df1.join(df2,contains.apply(df1.col("key"), df2.col("key")));

With the input data

firstRDD        secondRDD
+------+-----+  +------------+-----+
|   key|value|  |         key|value|
+------+-----+  +------------+-----+
|[a, b]|    A|  |   [a, b, c]|    C|
|[b, a]|    B|  |[a, b, c, d]|    D|
+------+-----+  +------------+-----+

we get

+------+-----+------------+-----+
|   key|value|         key|value|
+------+-----+------------+-----+
|[a, b]|    A|   [a, b, c]|    C|
|[a, b]|    A|[a, b, c, d]|    D|
+------+-----+------------+-----+

Please not that using an UDF as join criteria might not be the fastest option.

Upvotes: 1

Related Questions