Reputation: 180
pyspark beginner here - I have a spark dataframe where each row is a url on s3. each url is a GZIP file of JSON array, I can parse each row (link) in the dataframe to a python list, But I don't know how to create multiple rows from this list of JSONs.
this is the function I used that returns a list of jsons:
def distributed_read_file(url):
s3_client = boto3.client('s3')
result = s3_client.get_object(Bucket=raw_data_bucket_name, Key=url)
bytestream = BytesIO(result['Body'].read())
string_json = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')
list_of_jsons = json.loads(string_json)
If for example these are JSON objects from the list:
[{"a": 99, "b": 102}, {"a": 43, "b": 87}]
I want to run a function on the URLS dataframe, for example:
result_df = urls_rdd.map(distributed_read_file)
And get a dataframe with the columns : a and b (JSON keys). when I tried to do that, I am getting back each json object as MapType column and it is hard for me to work with that.
Thank you very much I hope it was clear!
Upvotes: 1
Views: 950
Reputation: 180
So if it helps someone, I found a solution that is really simple:
def distributed_read_gzip(url):
s3_client = boto3.client('s3')
result = s3_client.get_object(Bucket=raw_data_bucket_name, Key=url)
bytestream = BytesIO(result['Body'].read())
string_json = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')
for json_obj in json.loads(string_json):
yield Row(**json_obj)
while calling the function is done with a flat map, because several rows are returned for each URL:
new_rdd = urls_rdd.flatMap(distributed_read_gzip)
Upvotes: 2