Manuel Valero
Manuel Valero

Reputation: 455

Apache-Beam add sequence number to a PCollection

I'm trying to build an ETL to load a Dimension table. I'm ussign Apache Bea, with Python and DataFlow, and BigQuery.

I need to assign a sequence number to each element of a pcollection in order to load its into BigQuery, but I cant find any way to do this.

I think I need DataFlow to make the previous aggregation and joins to get my final pcollection to add the sequence number, but in this moment I need to stop parallel processing and cast my pcollection to a list (as in Spark when you use .collect()) and then make an easy loop to assign the sequence number. is it right?

This is the pipeline I've coded:

p | ReadFromAvro(known_args.input) | beam.Map(adapt) | beam.GroupByKey() | beam.Map(adaptGroupBy) 

I've read there is no way to get a list from pcollection: How to get a list of elements out of a PCollection in Google Dataflow and use it in the pipeline to loop Write Transforms?

How can I achieve it? Any help?

Upvotes: 1

Views: 2340

Answers (1)

Pablo
Pablo

Reputation: 11031

If what you want is to get a list with each of the elements in a PCollection, you can use a side input. Keep in mind that this will remove all parallelism from your results, and your pipeline may become slow.

If you still want to do this, then:

side_input_coll = beam.pvalue.AsIterable(my_collection)

(p 
 | beam.Create([0]) 
 | beam.FlatMap(lambda _, my_seq: [(elem, i) for i, elem in enumerate(my_seq)],
               my_seq=side_input_coll))

But don't forget that to preserve parallelism, it may be best to simply generate a random ID. Remember that PCollections are intrinsically unordered.

To learn more about side inputs, see the Beam Programming Guide on Side Inputs

Upvotes: 3

Related Questions