dre-hh
dre-hh

Reputation: 8044

How to scale a sequential source in dataflow / beam?

The question is about a sequentially emitting source in dataflow? For example the bigqueryio go implementaion is very basic. It reads a query resuls in a loop and emits records sequentially. Therefore dataflow is not scaling the amount of workers and detects a straggler.

Pseudocode of such custom DoFn in Java:


@ProcessElement
public void processElement(@Element byte[] input, OutputReceiver<T> output) throws Exception {
        // ..
        result = bigquery.query(queryConfig);
        for (FieldValueList row : result.iterateAll()) {    
            output.output(mapRowToType(row));
        }
}

Subsequent steps which consume the source in a ParDo are never scaled. The docs hint this can happen if the custom source does not implement progress (which it does not).

Example Transform of the above source in Java:

var bigqueryRows = pipeline.apply("ReadFromBigQuery", BigQueryIO.read(//...));
var mutations = bigqueryRows.apply("ProcessRows", ParDo.of(/**/ {
    @ProcessElement
    public void processElement(@Element ItemRow row, OutputReceiver<BigtableIO.Write.Mutation> out) {

        String rowKey = row.getId(); 
        BigtableIO.Write.Mutation mutation = BigtableIO.Write.Mutation.create(rowKey);
        // ...
        out.output(mutation);
    }
}));

How to deal with such sequential source, if there is no other option to consume the data. Is there a strategy one can add later to the pipeline so that it starts paralellizing the processing to multiple workers?

*NOTE: Even though my particular problem is with go sdk and bigqueryio, my question is about such a scenario in general. Feel free to provide a generic answer in any familiar beam language sdk.

Upvotes: 0

Views: 39

Answers (0)

Related Questions