Reputation: 91
I'm new to Apache Beam, so I'm struggling a bit with the following scenario:
The ParDo transform that tries to fetch Firestore data does not run at all. If using "customerId" fixed value everything works as expected ... although not using a proper Fetch from Firestore (simple ParDo), it works. Am I doing something that is not supposed to? Including my code bellow:
class getFirestoreUsers(beam.DoFn):
def process(self, element, customerId):
print(f'Getting Users from Firestore, ID: {customerId}')
# Call function to initialize Database
db = intializeFirebase()
""" # get customer information from the database
doc = db.document(f'Customers/{customerId}').get()
customer = doc.to_dict() """
usersList = {}
# Get Optin Users
try:
docs = db.collection(
f'Customers/{customerId}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
usersList = {user.id: user.to_dict() for user in docs}
except Exception as err:
print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
print(err)
return([usersList])
Main code
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--topic',
type=str,
help='Pub/Sub topic to read from')
parser.add_argument(
'--output',
help=('Output local filename'))
args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)
users = (p | 'Create chars' >> beam.Create([
{
"clientMac": "7c:d9:5c:b8:6f:38",
"username": "Louis"
},
{
"clientMac": "48:fd:8e:b0:6f:38",
"username": "Paul"
}
]))
# Get Dictionary from Pub/Sub
data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
| 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e))
)
# Get customerId from Pub/Sub information
PcustomerId = (data | 'get customerId from Firestore' >>
beam.ParDo(lambda x: [x.get('customerId')]))
PcustomerId | 'print customerId' >> beam.Map(print)
# Get Users from Firestore
custUsers = (users | 'Read from Firestore' >> beam.ParDo(
getFirestoreUsers(), customerId=beam.pvalue.AsSingleton(PcustomerId)))
custUsers | 'print Users from Firestore' >> beam.Map(print)
In order to avoid errors for running the function I had to initialise "users" dictionary, which I completely ignore aftewards. I suppose I have several errors here, so your help is much appreciated.
Upvotes: 2
Views: 1223
Reputation: 1383
It's not clear to me how users
PCollection is used (since element
is not processed in the process
definition) in the example code. I've re-arranged the code a little bit with windowing and used the customer_id
as the main input.
class GetFirestoreUsers(beam.DoFn):
def setup(self):
# Call function to initialize Database
self.db = intializeFirebase()
def process(self, element):
print(f'Getting Users from Firestore, ID: {element}')
""" # get customer information from the database
doc = self.db.document(f'Customers/{element}').get()
customer = doc.to_dict() """
usersList = {}
# Get Optin Users
try:
docs = self.db.collection(
f'Customers/{element}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
usersList = {user.id: user.to_dict() for user in docs}
except Exception as err:
print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
print(err)
return([usersList])
data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
| beam.WindowInto(window.FixedWindow(60))
| 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e)))
# Get customerId from Pub/Sub information
customer_id = (data | 'get customerId from Firestore' >>
beam.Map(lambda x: x.get('customerId')))
customer_id | 'print customerId' >> beam.Map(print)
# Get Users from Firestore
custUsers = (cutomer_id | 'Read from Firestore' >> beam.ParDo(
GetFirestoreUsers())
custUsers | 'print Users from Firestore' >> beam.Map(print)
From your comment:
the data needed (customerID first and customers data after) is not ready when running the "main" PCollection with original JSON data from Pub/Sub
Did you mean the data in firestore is not ready when reading the Pub/Sub topic?
You can always split the logic into 2 pipelines in your main function and run them one after another.
Upvotes: 2