Reputation: 489
I am trying to filter dynamic filtering based on the data residing in another dynamic frame , i am working on join and relational example , in this code person and membership dynamic frames are joined by id but i would like to filter persons based on id present in membership DF , below is code where i put static values
import sys
from awsglue.transforms import Join
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
# catalog: database and table names
db_name = "legislators"
tbl_persons = "persons_json"
tbl_membership = "memberships_json"
tbl_organization = "organizations_json"
# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)
memberships = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_membership)
orgs = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_organization)
persons = persons.drop_fields(['links', 'identifiers','other_names', 'images','contact_details'])
# Keep the fields we need and rename some.
orgs = orgs.drop_fields(['other_names', 'identifiers','links'])
fileredPersons = Filter.apply(frame = persons,
f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
print "Filtered record count: ", fileredPersons.count()
below is the filter logic
fileredPersons = Filter.apply(frame = persons,
f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
I would like pass person_id column present in membership DF into filter function condition, basically filters persons having in in memberships , any help would be appreciated.
Upvotes: 0
Views: 10782
Reputation: 5526
You can simply perform the inner join instead of filtering like
persons_filtered = persons.alias('persons').join(memberships, persons.id==memberships.id).select('persons.*')
This will give you the filtered values only. If your memberships df is small or kind of lookup then you can even broadcast it for faster results
from pyspark.sql.functions import broadcast
persons_filtered = persons.alias('persons').join(broadcast(memberships), persons.id==memberships.id).select('persons.*')
Hope it helps.
Upvotes: 1