Reputation: 1300
Current situation
The porpouse of this pipeline is to read from pub/sub the payload with geodata, then this data are transformed and analyzed and finally return if a condition is true or false
with beam.Pipeline(options=pipeline_options) as p:
raw_data = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
subscription='projects/XXX/subscriptions/YYY'))
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
def GeoDataIngestion(string_input):
<...>
return True or False
Desirable situation 1
If the GeoDataIngestion result is true, then the raw_data will be stored in big query
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
| 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
)
def Condition(condition):
if condition:
<...WriteToBigQuery...>
#The class I used before to store raw_data without depending on evaluate condition:
class WriteToBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'Format' >> beam.ParDo(FormatBigQueryFn())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'XXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
Desirable situation 2
Instead of store the data in BigQuery, it would be also good to send to pub/sub
def Condition(condition):
if condition:
<...SendToPubSub(Topic1)...>
else:
<...SendToPubSub(Topic2)...>
Here, the problem is to set the Topic depending of the condition result, because i'm not able to pass the topic like parameter in the pipeline
| beam.io.WriteStringsToPubSub(TOPIC)
Neither in a function/class
Question
How can I do that?
How/where should I call WriteToBigQuery to store the PCollection raw_data if the result of Evaluate condition is true?
Upvotes: 3
Views: 4647
Reputation: 1525
I think branching collections based on the evaluation condition result might be helpful for your scenario. Please see the documentation here.
To illustrate the branching, suppose I have a collection below, where you want to do different action based on the content of the string.
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2'
The code below will create tag the collection and you can get three different PCollections based on the tag. Then you can decide what further actions you want to perform on the individual collections.
import apache_beam as beam
from apache_beam import pvalue
import sys
class Split(beam.DoFn):
# These tags will be used to tag the outputs of this DoFn.
OUTPUT_TAG_BQ = 'BigQuery'
OUTPUT_TAG_PS1 = 'pubsub topic1'
OUTPUT_TAG_PS2 = 'pubsub topic2'
def process(self, element):
"""
tags the input as it processes the orignal PCollection
"""
print element
if "BigQuery" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
print 'found bq'
elif "pubsub topic1" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
elif "pubsub topic2" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)
if __name__ == '__main__':
output_prefix = 'C:\\pythonVirtual\\Mycodes\\output'
p = beam.Pipeline(argv=sys.argv)
lines = (p
| beam.Create([
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2']))
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
tagged_lines_result = (lines
| beam.ParDo(Split()).with_outputs(
Split.OUTPUT_TAG_BQ,
Split.OUTPUT_TAG_PS1,
Split.OUTPUT_TAG_PS2))
# tagged_lines_result is an object of type DoOutputsTuple. It supports
# accessing result in alternative ways.
bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')
p.run().wait_until_finish()
Please let me know if that helps.
Upvotes: 10