petereg157
petereg157

Reputation: 39

How to filter based on first element in rdd containing array of tuples in pyspark?

I am having issues while filtering the list of tuple from an rdd.

sample business.json

{"business_id":"gnKjwL_1w79qoiV3IC_xQQ","state":"NC","postal_code":"28210","latitude":35.092564,"longitude":-80.859132,"stars":4.0},
{"business_id":"xvX2CttrVhyG2z1dFg_0xw","state":"AZ","postal_code":"85338","latitude":33.4556129678,"longitude":-112.3955963552,"stars":5.0}
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

stars = "4.0"
input_business_lines = sc.textFile('data/business.json') \
    .map(lambda lines: json.loads(lines))

business_ids = input_business_lines \
    .map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \
    .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2])).collect()

The above code returns list of tuples each tuple has (first element = business_id , second element = state)

[('gnKjwL_1w79qoiV3IC_xQQ', 'NC'),('xvX2CttrVhyG2z1dFg_0xw', 'AZ'),...,('HhyxOkGAM07SRYtlQ4wMFQ', 'NC')]

So far everything is good. Now I need to make a join with reviews table and wanted to filter all the matching business ids with review's rdd. If that was a data frame it would be much easier. But in case of tuple I am not sure how can we do that.

Here is my attempt

- NOTE: but it only works if business_ids are list of business_ids not tuples

sample review.json

{"review_id": "c-6aA9Bd7JxpmMroRoas9A", "user_id": "bK4Y_GZUoAUTXIrmeEUGYw", "business_id": "gnKjwL_1w79qoiV3IC_xQQ", "stars": 4.0, "text": "Went there Saturday noon they open at 12pm but people were waiting outside before 12pm so you can tell it should be a good place. Nice Katsu & Eel with rice. Many Japanese go there.", "date": "2014-07-13 20:28:18"},
{"review_id": "EhvpZ1-MzemK1EMBUf19gQ", "user_id": "p0IderpL5zE4D021CXVxtA", "business_id": "KWywu2tTEPWmR9JnBc0WyQ", "stars": 5.0, "text": "I came here for my 40th .First of they offer free limo service to and from which was really cool.  When we first arrived we were greeted by the manager he walked us up to the top floor where the male review was and told us we had the choice of seats wherever we wanted.  We were also given two drink coupons per person so we ordered a drink and got seated. The show started off slowly but what I really liked is the guys came out to sit and talk to you, also they were really cool about taking pictures. By the end of the night it was so crowded but it was amazing had the best time I definitely will be back. I hope soon.", "date": "2015-11-20 05:24:45"}

Code

input_review_lines = sc.textFile('data/review.json').map(lambda lines: json.loads(lines))
rew_ids_bus_ids = input_review_lines \
                    .map(lambda kv: (kv['user_id'], kv['business_id'])) \
                    .filter(lambda kv: kv[1] in business_ids).collect()
rew_ids_bus_ids

Upvotes: 0

Views: 579

Answers (2)

Ali Hassan
Ali Hassan

Reputation: 339

import json
from pyspark import SparkContext

if __name__ == '__main__':
    input_review_json_path = "publicdata/review.json"
    input_business_json_path = "publicdata/business.json"
    output_csv_path = "outputs/user_state.csv"
    stars = "4.0"

    sc = SparkContext.getOrCreate()

    input_business_lines = sc.textFile(input_business_json_path) \
                             .map(lambda lines: json.loads(lines))

    business_ids = input_business_lines \
                        .map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \
                        .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2]))

    input_review_lines = sc.textFile(input_review_json_path) \
                            .map(lambda lines: json.loads(lines))

    rew_ids_bus_ids = input_review_lines.map(lambda kv: (kv['business_id'], kv['user_id']))
    finalRdd = business_ids.join(rew_ids_bus_ids).map(lambda kv: (kv[0], kv[1][0]))

    review_rdd = finalRdd.collect()
    

Upvotes: 1

Lamanus
Lamanus

Reputation: 13581

You can join those rdds.

import json

stars = 4.0
input_business_lines = sc.textFile('test.json') \
    .map(lambda lines: json.loads(lines))

business_ids = input_business_lines \
    .filter(lambda kv: kv['stars'] >= stars) \
    .map(lambda kv: (kv['business_id'], kv['state']))

print(business_ids.collect())

input_review_lines = sc.textFile('test2.json') \
    .map(lambda lines: json.loads(lines))

rew_ids_bus_ids = input_review_lines \
    .map(lambda kv: (kv['business_id'], kv['user_id']))

joined = business_ids \
    .join(rew_ids_bus_ids)

print(joined.collect())


# [('gnKjwL_1w79qoiV3IC_xQQ', 'NC'), ('xvX2CttrVhyG2z1dFg_0xw', 'AZ')]
# [('gnKjwL_1w79qoiV3IC_xQQ', ('NC', 'bK4Y_GZUoAUTXIrmeEUGYw'))]

Upvotes: 1

Related Questions