sumedha
sumedha

Reputation: 489

AWS Glue Dynamic Filtering - Filter one dynamic frame using another dynamic frame

I am trying to filter dynamic filtering based on the data residing in another dynamic frame , i am working on join and relational example , in this code person and membership dynamic frames are joined by id but i would like to filter persons based on id present in membership DF , below is code where i put static values

    import sys
from awsglue.transforms import Join
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

# catalog: database and table names
db_name = "legislators"
tbl_persons = "persons_json"
tbl_membership = "memberships_json"
tbl_organization = "organizations_json"



# Create dynamic frames from the source tables 
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)
memberships = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_membership)
orgs = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_organization)

persons = persons.drop_fields(['links', 'identifiers','other_names', 'images','contact_details'])


# Keep the fields we need and rename some.
orgs = orgs.drop_fields(['other_names', 'identifiers','links'])


fileredPersons = Filter.apply(frame = persons,
                              f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
print "Filtered record count:  ", fileredPersons.count()

below is the filter logic

 fileredPersons = Filter.apply(frame = persons,
                                  f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])

I would like pass person_id column present in membership DF into filter function condition, basically filters persons having in in memberships , any help would be appreciated.

Upvotes: 0

Views: 10782

Answers (1)

Shubham Jain
Shubham Jain

Reputation: 5526

You can simply perform the inner join instead of filtering like

persons_filtered = persons.alias('persons').join(memberships, persons.id==memberships.id).select('persons.*')

This will give you the filtered values only. If your memberships df is small or kind of lookup then you can even broadcast it for faster results

from pyspark.sql.functions import broadcast
persons_filtered = persons.alias('persons').join(broadcast(memberships), persons.id==memberships.id).select('persons.*')

Hope it helps.

Upvotes: 1

Related Questions