Dan McCabe
Dan McCabe

Reputation: 198

How to generate a new PySpark DataFrame by comparing entries in two others?

I would like to search through a Pyspark DataFrame containing string fields and determine which keyword strings appear in each. Say I have the following DataFrame of keywords:

+-----------+----------+
|       city|     state|
+-----------+----------+
|    Seattle|Washington|
|Los Angeles|California|
+-----------+----------+

which I would like to search for in this DataFrame:

+----------------------------------------+------+
|body                                    |source|
+----------------------------------------+------+
|Seattle is in Washington.               |a     |
|Los Angeles is in California            |b     |
|Banana is a fruit                       |c     |
|Seattle is not in New Hampshire         |d     |
|California is home to Los Angeles       |e     |
|Seattle, California is not a real place.|f     |
+----------------------------------------+------+

I want to create a new DataFrame that identifies which keywords of which type appear in each source. So the desired end result would be:

+-----------+------+-----+
|name       |source|type |
+-----------+------+-----+
|Seattle    |a     |city |
|Washington |a     |state|
|Los Angeles|b     |city |
|California |b     |state|
|Seattle    |d     |city |
|Los Angeles|e     |city |
|California |e     |state|
|Seattle    |f     |city |
|California |f     |state|
+-----------+------+-----+

How can I obtain this result? I could use join to isolate the body strings that contain these keywords, but I'm not sure how to track which specific keyword was a match and use that information to create new columns.

Upvotes: 0

Views: 237

Answers (1)

MaFF
MaFF

Reputation: 10096

First, let's create and modify the dataframes:

import pyspark.sql.functions as psf
keywords_df = sc.parallelize([["Seattle", "Washington"], ["Los Angeles", "California"]])\
    .toDF(["city", "state"])
keywords_df = keywords_df\
    .withColumn("struct", psf.explode(psf.array(
        psf.struct(psf.col("city").alias("word"), psf.lit("city").alias("type")), 
        psf.struct(psf.col("state").alias("word"), psf.lit("state").alias("type"))
    )))\
    .select("struct.*")
keywords_df.show()

    +-----------+-----+
    |       word| type|
    +-----------+-----+
    |    Seattle| city|
    | Washington|state|
    |Los Angeles| city|
    | California|state|
    +-----------+-----+

If your key words didn't contain spaces you could have split your sentences into words, that you'd have exploded to get just one word on each line. Then you'd have been able to join with your keywords dataframe. It's not the case here because of Los Angeles.

text_df = sc.parallelize([["Seattle is in Washington.", "a"],["Los Angeles is in California", "b"],
                          ["Banana is a fruit", "c"],["Seattle is not in New Hampshire", "d"],
                          ["California is home to Los Angeles", "e"],["Seattle, California is not a real place.", "f"]])\
    .toDF(["body", "source"])

Instead we'll use a join with a string contains condition instead:

res = text_df.join(keywords_df, text_df.body.contains(keywords_df.word)).drop("body")
res.show()

    +------+-----------+-----+
    |source|       word| type|
    +------+-----------+-----+
    |     a|    Seattle| city|
    |     a| Washington|state|
    |     b|Los Angeles| city|
    |     b| California|state|
    |     d|    Seattle| city|
    |     f|    Seattle| city|
    |     e|Los Angeles| city|
    |     e| California|state|
    |     f| California|state|
    +------+-----------+-----+

Upvotes: 2

Related Questions