digitalearth
digitalearth

Reputation: 27

Join multiple pCollection in global window

I want to build a Beam program to

  1. Streamingly read Pub/Sub messages which is like {"user_id":"u1"}
  2. Use the userid to retrieve data from 7+ BigQuery tables. Due to performance issues, I need to the run those queries in parallel.
  3. Join the results from the 7+ queries to construct a complete record

Here is my code (only contain queries on 2 tables)

import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from google.cloud import bigquery
import apache_beam.io.gcp.pubsub as pubsub
import re,os
import logging,json

class extractElement(beam.DoFn):
   def process(self, element, *args, **kwargs):
       # The input is of pubsubmessage type which is b'{"user_id":"u1"}', need to decode
       try:
           print("extractElement Start")
           data = element.data.decode('utf-8')
           yield (data)
       except Exception as err:
           step_name = 'extractElement'
           failure=[step_name,element]
           yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE,failure)

class enrich_country(beam.DoFn):

    def process(self, element, *args, **kwargs):
        # The input is of json type which is '{"user_id":"u1"}', no need for decode
        try:
            print("Enrich Country Start")
            uid=json.loads(element).get('user_id')
            query = 'select country from `agolis-allen-first.dataflow_bole.country_dim` where user_id="{}"' \
               .format(uid)
            client=bigquery.Client()
            query_job = client.query(query)
            result=query_job.result()

            status=None
            country=None
            len_result = 0
            for row in result:
                country=row.country
                len_result+=1

            if len_result == 0:
                status=OUTPUT_TAG_NO_REC
            else:
                status = OUTPUT_TAG_COMPLETE

            yield (uid,country,status)
        except Exception as err:
            step_name = 'enrich_country'
            failure = [(uid,country, step_name)]
            yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)

class enrich_history(beam.DoFn):

    def process(self, element, *args, **kwargs):
        # The input is of json type which is '{"user_id":"u1"}', no need for decode
        try:
            print("Enrich history started")
            uid=json.loads(element).get('user_id')
            query = 'select event_date,event_name,device from `agolis-allen-first.dataflow_bole.event_history` where user_id="{}"' \
               .format(uid)
            client=bigquery.Client()
            query_job = client.query(query)
            result=query_job.result()

            status=None
            event_params=[]

            len_result = 0
            for row in result:
                single_event_params={}
                single_event_params['event_date']=row.event_date
                single_event_params['event_name'] = row.event_name
                single_event_params['device'] = row.device
                event_params.append(single_event_params)
                len_result+=1

            if len_result == 0:
                status=OUTPUT_TAG_NO_REC
            else:
                status = OUTPUT_TAG_COMPLETE

            yield (uid,event_params,status)
        except Exception as err:
            step_name = 'enrich_hisotry'
            failure = [(uid,event_params, step_name)]
            yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)

def run(argv=None,save_main_session=True):
    parser=argparse.ArgumentParser()
    parser.add_argument('--outputTable',
                       dest='outputTable',
                       required=True)
    parser.add_argument('--stagingLocation',
                       dest='stagingLocation',
                       required=True)
    parser.add_argument('--tempLocation',
                       dest='tempLocation',
                       required=True)
    parser.add_argument('--runner',
                       dest='runner',
                       required=True)

    group=parser.add_mutually_exclusive_group(required=True)
    group.add_argument('--inputTopic',
                       dest='inputTopic')
    group.add_argument('--inputSub',
                       dest='inputSub')

    known_args,pipeline_args=parser.parse_known_args(argv)
    pipeline_options=PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session=save_main_session
    pipeline_options.view_as(StandardOptions).streaming=True


    p=beam.Pipeline(runner=known_args.runner,options=pipeline_options)
    if known_args.inputSub:
       message=(
            p|beam.io.ReadFromPubSub(subscription=known_args.inputSub,with_attributes=True))
    else:
       message=(
           p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))

    # withoutputs(with_outputs(OUTPUT_TAG_FAILURE,main='outputs'))
    # the output without any TAG will be assigned TAG outputs.
    mainData,failure_extractElement=(
        message |'split'>>beam.ParDo(extractElement()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
    )

    enrichCountry,failure_enrich=(
        mainData |'enrich country' >> beam.ParDo(enrich_country()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
    )

    enrichHistory,failure_enrich=(
        mainData |'enrich history' >> beam.ParDo(enrich_history()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
    )

    processedData = (
        (enrichCountry,enrichHistory)
        |'combine record' >> ????
    )


if __name__ == '__main__':
    path_to_credential = '***.json'
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path_to_credential
    logging.getLogger().setLevel(logging.INFO)

    OUTPUT_TAG_NO_REC = 'Norecord'
    OUTPUT_TAG_COMPLETE = 'complete'
    OUTPUT_TAG_FAILURE = 'failure'

    run()

The output of "enrich country" includes uid, country, status and the output of "enrich history" includes uid, array of event history.

I want to combine the two pCollection to generate record like uid, country, array of event history.

I tried to apply CoGroupbyKey() in the "combine record" step, but I was told "GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger".

My questions are:

  1. In my case, how to join multiple pCollections in global window?
  2. Is it possible to define trigger for global window?
  3. I also considered side input, but it seems "side input" does not support multiple pCollection, am I right? If not, is there any sample code to refer to ?

Upvotes: 0

Views: 131

Answers (1)

kiran mathew
kiran mathew

Reputation: 2373

@digitalearth, As you mentioned in the comment.

The above issue is overcome by using a fixed window.

Posting the answer as community wiki for the benefit of the community that might encounter this use case in the future.

Feel free to edit this answer for additional information.

Upvotes: 0

Related Questions