Rui Bras Fernandes
Rui Bras Fernandes

Reputation: 91

Apache Beam / Dataflow pub/sub side input with python

I'm new to Apache Beam, so I'm struggling a bit with the following scenario:

The ParDo transform that tries to fetch Firestore data does not run at all. If using "customerId" fixed value everything works as expected ... although not using a proper Fetch from Firestore (simple ParDo), it works. Am I doing something that is not supposed to? Including my code bellow:

class getFirestoreUsers(beam.DoFn):
    def process(self, element, customerId):

        print(f'Getting Users from Firestore, ID: {customerId}')

        # Call function to initialize Database
        db = intializeFirebase()

        """ # get customer information from the database
        doc = db.document(f'Customers/{customerId}').get()
        customer = doc.to_dict() """
        usersList = {}

        # Get Optin Users
        try:
            docs = db.collection(
                f'Customers/{customerId}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
            usersList = {user.id: user.to_dict() for user in docs}
        except Exception as err:
            print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
            print(err)

        return([usersList])

Main code

def run(argv=None):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--topic',
        type=str,
        help='Pub/Sub topic to read from')
    parser.add_argument(
        '--output',
        help=('Output local filename'))

    args, pipeline_args = parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    options.view_as(SetupOptions).save_main_session = True
    options.view_as(StandardOptions).streaming = True

    p = beam.Pipeline(options=options)

    users = (p | 'Create chars' >> beam.Create([
        {
             "clientMac": "7c:d9:5c:b8:6f:38",
             "username": "Louis"
             },
        {
            "clientMac": "48:fd:8e:b0:6f:38",
            "username": "Paul"
        }
    ]))


    # Get Dictionary from Pub/Sub
    data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
            | 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e))
            )

    # Get customerId from Pub/Sub information
    PcustomerId = (data | 'get customerId from Firestore' >>
                   beam.ParDo(lambda x: [x.get('customerId')]))
    PcustomerId | 'print customerId' >> beam.Map(print)

    # Get Users from Firestore
    custUsers = (users | 'Read from Firestore' >> beam.ParDo(
        getFirestoreUsers(), customerId=beam.pvalue.AsSingleton(PcustomerId)))
    custUsers | 'print Users from Firestore' >> beam.Map(print)

In order to avoid errors for running the function I had to initialise "users" dictionary, which I completely ignore aftewards. I suppose I have several errors here, so your help is much appreciated.

Upvotes: 2

Views: 1223

Answers (1)

ningk
ningk

Reputation: 1383

It's not clear to me how users PCollection is used (since element is not processed in the process definition) in the example code. I've re-arranged the code a little bit with windowing and used the customer_id as the main input.

class GetFirestoreUsers(beam.DoFn):
  def setup(self):
    # Call function to initialize Database
    self.db = intializeFirebase()

  def process(self, element):
    print(f'Getting Users from Firestore, ID: {element}')

    """ # get customer information from the database
    doc = self.db.document(f'Customers/{element}').get()
    customer = doc.to_dict() """
    usersList = {}

    # Get Optin Users
    try:
        docs = self.db.collection(
            f'Customers/{element}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
        usersList = {user.id: user.to_dict() for user in docs}
    except Exception as err:
        print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
        print(err)

    return([usersList])



data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
          | beam.WindowInto(window.FixedWindow(60))
          | 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e)))

# Get customerId from Pub/Sub information
customer_id = (data | 'get customerId from Firestore' >>
               beam.Map(lambda x: x.get('customerId')))
customer_id | 'print customerId' >> beam.Map(print)

# Get Users from Firestore
custUsers = (cutomer_id | 'Read from Firestore' >> beam.ParDo(
    GetFirestoreUsers())
custUsers | 'print Users from Firestore' >> beam.Map(print)

From your comment:

the data needed (customerID first and customers data after) is not ready when running the "main" PCollection with original JSON data from Pub/Sub

Did you mean the data in firestore is not ready when reading the Pub/Sub topic?

You can always split the logic into 2 pipelines in your main function and run them one after another.

Upvotes: 2

Related Questions