Reputation: 61
I am trying to run a job on Google Dataflow with the following process flow:
Essentially taking a single datasource, filtering based on certain values within the dictionary and create separate outputs for each filter criteria.
I've written the following code:
# List of values to filter by
x_list = [1, 2, 3]
with beam.Pipeline(options=PipelineOptions().from_dictionary(pipeline_params)) as p:
# Read in newline JSON data - each line is a dictionary
log_data = (
p
| "Create " + input_file >> beam.io.textio.ReadFromText(input_file)
| "Load " + input_file >> beam.FlatMap(lambda x: json.loads(x))
)
# For each value in x_list, filter log_data for dictionaries containing the value & write out to separate file
for i in x_list:
# Return dictionary if given key = value in filter
filtered_log = log_data | "Filter_"+i >> beam.Filter(lambda x: x['key'] == i)
# Do additional processing
processed_log = process_pcoll(filtered_log, event)
# Write final file
output = (
processed_log
| 'Dump_json_'+filename >> beam.Map(json.dumps)
| "Save_"+filename >> beam.io.WriteToText(output_fp+filename,num_shards=0,shard_name_template="")
)
Currently it only processes the first value in the list. I know that I probably have to use ParDo, but I'm not very sure how to factor that into my process.
Upvotes: 2
Views: 6564
Reputation: 526
You can use TaggedOutput in Beam.Write a beam function which will tag each element in pcollection.
import uuid
import apache_beam as beam
import dateutil.parser
from apache_beam.pvalue import TaggedOutput
class TagData(beam.DoFn):
def process(self, element):
key = element.get('key')
yield TaggedOutput(key, element)
processed_tagged_log = processed_log | "tagged-data-by-key " >> beam.ParDo(TagData()).with_outputs(*x_list)
Now you can write this output to separate file/table
# Write files to separate tables/files
for key in x_list:
processed_tagged_log[key] | "save file %s" % uuid.uuid4()>> beam.io.WriteToText(output_fp+key+filename,num_shards=0,shard_name_template="")
Source: https://beam.apache.org/documentation/sdks/pydoc/2.0.0/_modules/apache_beam/pvalue.html
Upvotes: 6