horatio1701d
horatio1701d

Reputation: 9169

Fitter Spark RDD based on result from filtering of different RDD

conf = SparkConf().setAppName("my_app")
with SparkContext(conf=conf) as sc:
    sqlContext = SQLContext(sc)
    df = sqlContext.read.parquet(*s3keys)

    # this gives me distinct values as list
    rdd = df.filter(
            (1442170800000 <= df.timestamp) & (
                df.timestamp <= 1442185200000) & (
                    df.lat > 40.7480) & (df.lat < 40.7513) & (
                        df.lon > -73.8492) & (
                            df.lon < -73.8438)).map(lambda p: p.userid).distinct()

    # how do I apply the above list to filter another rdd? 
    df2 = sqlContext.read.parquet(*s3keys_part2)
    # example:
    rdd = df2.filter(df2.col1 in (rdd values from above))

Upvotes: 0

Views: 362

Answers (2)

Matthew Gray
Matthew Gray

Reputation: 1676

This is Scala code, instead of Python, but hopefully it can still serve as an example.

val x = 1 to 9
val df2 = sc.parallelize(x.map(a => (a,a*a))).toDF()
val df3 = sc.parallelize(x.map(a => (a,a*a*a))).toDF()

This gives us two dataframes, each with columns named _1 and _2, which are the first nine natural numbers and their squares/cubes.

val fil = df2.filter("_1 < 5") // Nine is too many, let's go to four.
val filJoin = fil.join(df3,fil("_1") === df3("_1")
filJoin.collect

This gets us:

Array[org.apache.spark.sql.Row] = Array([1,1,1,1], [2,4,2,8], [3,9,3,27], [4,16,4,64])

To apply this to your problem, I would start with something like the following:

rdd2 = rdd.join(df2, rdd.userid == df2.userid, 'inner')

But notice that we need to tell it what columns to join on, which might be something other than userid for df2. I'd also recommend, instead of map(lambda p: p.userid) you use .select('userid').distinct() so that it's still a dataframe.

You can find out more about join here.

Upvotes: 0

zero323
zero323

Reputation: 330413

As mentioned by Matthew Graves what you need here is a join. It means more or less something like this:

pred = ((1442170800000 <= df.timestamp) & 
        (df.timestamp <= 1442185200000) &
        (df.lat > 40.7480) &
        (df.lat < 40.7513) &
        (df.lon > -73.8492) &
        (df.lon < -73.8438))

users = df.filter(pred).select("userid").distinct()

users.join(df2, users.userid == df2.col1)

Upvotes: 1

Related Questions