Reputation: 39
I am having issues while filtering the list of tuple from an rdd.
sample business.json
{"business_id":"gnKjwL_1w79qoiV3IC_xQQ","state":"NC","postal_code":"28210","latitude":35.092564,"longitude":-80.859132,"stars":4.0},
{"business_id":"xvX2CttrVhyG2z1dFg_0xw","state":"AZ","postal_code":"85338","latitude":33.4556129678,"longitude":-112.3955963552,"stars":5.0}
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
stars = "4.0"
input_business_lines = sc.textFile('data/business.json') \
.map(lambda lines: json.loads(lines))
business_ids = input_business_lines \
.map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \
.filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2])).collect()
The above code returns list of tuples each tuple has (first element = business_id , second element = state)
[('gnKjwL_1w79qoiV3IC_xQQ', 'NC'),('xvX2CttrVhyG2z1dFg_0xw', 'AZ'),...,('HhyxOkGAM07SRYtlQ4wMFQ', 'NC')]
So far everything is good. Now I need to make a join with reviews table and wanted to filter all the matching business ids with review's rdd. If that was a data frame it would be much easier. But in case of tuple I am not sure how can we do that.
Here is my attempt
- NOTE: but it only works if business_ids are list of business_ids not tuples
sample review.json
{"review_id": "c-6aA9Bd7JxpmMroRoas9A", "user_id": "bK4Y_GZUoAUTXIrmeEUGYw", "business_id": "gnKjwL_1w79qoiV3IC_xQQ", "stars": 4.0, "text": "Went there Saturday noon they open at 12pm but people were waiting outside before 12pm so you can tell it should be a good place. Nice Katsu & Eel with rice. Many Japanese go there.", "date": "2014-07-13 20:28:18"},
{"review_id": "EhvpZ1-MzemK1EMBUf19gQ", "user_id": "p0IderpL5zE4D021CXVxtA", "business_id": "KWywu2tTEPWmR9JnBc0WyQ", "stars": 5.0, "text": "I came here for my 40th .First of they offer free limo service to and from which was really cool. When we first arrived we were greeted by the manager he walked us up to the top floor where the male review was and told us we had the choice of seats wherever we wanted. We were also given two drink coupons per person so we ordered a drink and got seated. The show started off slowly but what I really liked is the guys came out to sit and talk to you, also they were really cool about taking pictures. By the end of the night it was so crowded but it was amazing had the best time I definitely will be back. I hope soon.", "date": "2015-11-20 05:24:45"}
Code
input_review_lines = sc.textFile('data/review.json').map(lambda lines: json.loads(lines))
rew_ids_bus_ids = input_review_lines \
.map(lambda kv: (kv['user_id'], kv['business_id'])) \
.filter(lambda kv: kv[1] in business_ids).collect()
rew_ids_bus_ids
Upvotes: 0
Views: 579
Reputation: 339
import json
from pyspark import SparkContext
if __name__ == '__main__':
input_review_json_path = "publicdata/review.json"
input_business_json_path = "publicdata/business.json"
output_csv_path = "outputs/user_state.csv"
stars = "4.0"
sc = SparkContext.getOrCreate()
input_business_lines = sc.textFile(input_business_json_path) \
.map(lambda lines: json.loads(lines))
business_ids = input_business_lines \
.map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \
.filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2]))
input_review_lines = sc.textFile(input_review_json_path) \
.map(lambda lines: json.loads(lines))
rew_ids_bus_ids = input_review_lines.map(lambda kv: (kv['business_id'], kv['user_id']))
finalRdd = business_ids.join(rew_ids_bus_ids).map(lambda kv: (kv[0], kv[1][0]))
review_rdd = finalRdd.collect()
Upvotes: 1
Reputation: 13581
You can join those rdd
s.
import json
stars = 4.0
input_business_lines = sc.textFile('test.json') \
.map(lambda lines: json.loads(lines))
business_ids = input_business_lines \
.filter(lambda kv: kv['stars'] >= stars) \
.map(lambda kv: (kv['business_id'], kv['state']))
print(business_ids.collect())
input_review_lines = sc.textFile('test2.json') \
.map(lambda lines: json.loads(lines))
rew_ids_bus_ids = input_review_lines \
.map(lambda kv: (kv['business_id'], kv['user_id']))
joined = business_ids \
.join(rew_ids_bus_ids)
print(joined.collect())
# [('gnKjwL_1w79qoiV3IC_xQQ', 'NC'), ('xvX2CttrVhyG2z1dFg_0xw', 'AZ')]
# [('gnKjwL_1w79qoiV3IC_xQQ', ('NC', 'bK4Y_GZUoAUTXIrmeEUGYw'))]
Upvotes: 1