moudi
moudi

Reputation: 147

How flatMap a dataFrame from another dataFrame in Java?

I have a dataFrame like the following:

+-----------------+--------------------+
|               id|            document|
+-----------------+--------------------+
| doc1            |"word1, word2"      |
| doc2            |"word3 word4"       |
+-----------------+--------------------+

I want to create another dataFrame with following structure:

   +-----------------+--------------------+-----------------+
    |               id|            document| word           |
    +-----------------+--------------------+----------------|
    | doc1            |"word1, word2"      | word1          |
    | doc1            |"word1 word2"       | word2          |
    | doc2            |"word3 word4"       | word3          |
    | doc2            |"word3 word4"       | word4          |
    +-----------------+--------------------+----------------|

I tried the following:

public static Dataset<Row> buildInvertIndex(Dataset<Row> inputRaw, SQLContext sqlContext, String id) {

    JavaRDD<Row> inputInvertedIndex = inputRaw.javaRDD();
    JavaRDD<Tuple3<String, String ,String>> d = inputInvertedIndex.flatMap(x -> {

        List<Tuple3<String, String, String>> k = new ArrayList<>();
        String data2 = x.getString(0).toString();
        String[] field2 = x.getString(1).split(" ", -1);
        for(String s: field2)
            k.add(new Tuple3<String, String, String>(data2, x.getString(1), s));
        return k.iterator();
    }
            );


    JavaPairRDD<String, Tuple2<String, String>>d2 = d.mapToPair(x->{
        return new Tuple2<String, Tuple2<String, String>>(x._3(), new Tuple2<String, String>(x._1(), x._2()));  

    });

    Dataset<Row> d3 = sqlContext.createDataset(JavaPairRDD.toRDD(d2), Encoders.tuple(Encoders.STRING(), Encoders.tuple(Encoders.STRING(),Encoders.STRING()))).toDF();

    return d3;
}

But it gives:

+-----------------+----------------------+
|               _1|            _2        |
+-----------------+----------------------+
| word1           |[doc1,"word1, word2"] |
| word2           |[doc1,"word1 word2"]  |
| word3           |[doc2, "word3, word4"]|
| word4           |[doc2, "word3, word4"]|
+-----------------+----------------------+

Im newbie to spark in java. SO please any help will be so appreciated. In addition please, suppose in the second dataframe above i want to compute a string similarity metric(i.e, jaccard) on the two column document and word and add the result in a new column, how can i do that?

Upvotes: 0

Views: 225

Answers (1)

a.l.
a.l.

Reputation: 1135

You can use explode and split

import static org.apache.spark.sql.functions.expr;
inputRaw.withColumn("word", expr("explode(split(document, '[, ]+'))"))

Upvotes: 1

Related Questions