Reputation: 8044
The question is about a sequentially emitting source in dataflow? For example the bigqueryio go implementaion is very basic. It reads a query resuls in a loop and emits records sequentially. Therefore dataflow is not scaling the amount of workers and detects a straggler.
Pseudocode of such custom DoFn in Java:
@ProcessElement
public void processElement(@Element byte[] input, OutputReceiver<T> output) throws Exception {
// ..
result = bigquery.query(queryConfig);
for (FieldValueList row : result.iterateAll()) {
output.output(mapRowToType(row));
}
}
Subsequent steps which consume the source in a ParDo are never scaled. The docs hint this can happen if the custom source does not implement progress (which it does not).
Example Transform of the above source in Java:
var bigqueryRows = pipeline.apply("ReadFromBigQuery", BigQueryIO.read(//...));
var mutations = bigqueryRows.apply("ProcessRows", ParDo.of(/**/ {
@ProcessElement
public void processElement(@Element ItemRow row, OutputReceiver<BigtableIO.Write.Mutation> out) {
String rowKey = row.getId();
BigtableIO.Write.Mutation mutation = BigtableIO.Write.Mutation.create(rowKey);
// ...
out.output(mutation);
}
}));
How to deal with such sequential source, if there is no other option to consume the data. Is there a strategy one can add later to the pipeline so that it starts paralellizing the processing to multiple workers?
*NOTE: Even though my particular problem is with go sdk and bigqueryio, my question is about such a scenario in general. Feel free to provide a generic answer in any familiar beam language sdk.
Upvotes: 0
Views: 39