Reputation: 27
I want to build a Beam program to
{"user_id":"u1"}
Here is my code (only contain queries on 2 tables)
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from google.cloud import bigquery
import apache_beam.io.gcp.pubsub as pubsub
import re,os
import logging,json
class extractElement(beam.DoFn):
def process(self, element, *args, **kwargs):
# The input is of pubsubmessage type which is b'{"user_id":"u1"}', need to decode
try:
print("extractElement Start")
data = element.data.decode('utf-8')
yield (data)
except Exception as err:
step_name = 'extractElement'
failure=[step_name,element]
yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE,failure)
class enrich_country(beam.DoFn):
def process(self, element, *args, **kwargs):
# The input is of json type which is '{"user_id":"u1"}', no need for decode
try:
print("Enrich Country Start")
uid=json.loads(element).get('user_id')
query = 'select country from `agolis-allen-first.dataflow_bole.country_dim` where user_id="{}"' \
.format(uid)
client=bigquery.Client()
query_job = client.query(query)
result=query_job.result()
status=None
country=None
len_result = 0
for row in result:
country=row.country
len_result+=1
if len_result == 0:
status=OUTPUT_TAG_NO_REC
else:
status = OUTPUT_TAG_COMPLETE
yield (uid,country,status)
except Exception as err:
step_name = 'enrich_country'
failure = [(uid,country, step_name)]
yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)
class enrich_history(beam.DoFn):
def process(self, element, *args, **kwargs):
# The input is of json type which is '{"user_id":"u1"}', no need for decode
try:
print("Enrich history started")
uid=json.loads(element).get('user_id')
query = 'select event_date,event_name,device from `agolis-allen-first.dataflow_bole.event_history` where user_id="{}"' \
.format(uid)
client=bigquery.Client()
query_job = client.query(query)
result=query_job.result()
status=None
event_params=[]
len_result = 0
for row in result:
single_event_params={}
single_event_params['event_date']=row.event_date
single_event_params['event_name'] = row.event_name
single_event_params['device'] = row.device
event_params.append(single_event_params)
len_result+=1
if len_result == 0:
status=OUTPUT_TAG_NO_REC
else:
status = OUTPUT_TAG_COMPLETE
yield (uid,event_params,status)
except Exception as err:
step_name = 'enrich_hisotry'
failure = [(uid,event_params, step_name)]
yield beam.pvalue.TaggedOutput(OUTPUT_TAG_FAILURE, failure)
def run(argv=None,save_main_session=True):
parser=argparse.ArgumentParser()
parser.add_argument('--outputTable',
dest='outputTable',
required=True)
parser.add_argument('--stagingLocation',
dest='stagingLocation',
required=True)
parser.add_argument('--tempLocation',
dest='tempLocation',
required=True)
parser.add_argument('--runner',
dest='runner',
required=True)
group=parser.add_mutually_exclusive_group(required=True)
group.add_argument('--inputTopic',
dest='inputTopic')
group.add_argument('--inputSub',
dest='inputSub')
known_args,pipeline_args=parser.parse_known_args(argv)
pipeline_options=PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session=save_main_session
pipeline_options.view_as(StandardOptions).streaming=True
p=beam.Pipeline(runner=known_args.runner,options=pipeline_options)
if known_args.inputSub:
message=(
p|beam.io.ReadFromPubSub(subscription=known_args.inputSub,with_attributes=True))
else:
message=(
p|beam.io.ReadFromPubSub(topic=known_args.inputTopic,with_attributes=True))
# withoutputs(with_outputs(OUTPUT_TAG_FAILURE,main='outputs'))
# the output without any TAG will be assigned TAG outputs.
mainData,failure_extractElement=(
message |'split'>>beam.ParDo(extractElement()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
)
enrichCountry,failure_enrich=(
mainData |'enrich country' >> beam.ParDo(enrich_country()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
)
enrichHistory,failure_enrich=(
mainData |'enrich history' >> beam.ParDo(enrich_history()).with_outputs(OUTPUT_TAG_FAILURE,main='outputs')
)
processedData = (
(enrichCountry,enrichHistory)
|'combine record' >> ????
)
if __name__ == '__main__':
path_to_credential = '***.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path_to_credential
logging.getLogger().setLevel(logging.INFO)
OUTPUT_TAG_NO_REC = 'Norecord'
OUTPUT_TAG_COMPLETE = 'complete'
OUTPUT_TAG_FAILURE = 'failure'
run()
The output of "enrich country" includes uid, country, status
and
the output of "enrich history" includes uid, array of event history
.
I want to combine the two pCollection to generate record like uid, country, array of event history
.
I tried to apply CoGroupbyKey() in the "combine record" step, but I was told "GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger".
My questions are:
Upvotes: 0
Views: 131
Reputation: 2373
@digitalearth, As you mentioned in the comment.
The above issue is overcome by using a fixed window.
Posting the answer as community wiki for the benefit of the community that might encounter this use case in the future.
Feel free to edit this answer for additional information.
Upvotes: 0