Alexander
Alexander

Reputation: 374

Does Google Cloud Dataflow support stateful processing? ERROR: Workflow failed

I'm using stateful processing in a streaming python Beam pipeline in order to detect when a field of a JSON message changes. When running using DirectRunner it works fine, but using DataflowRunner I get a "Workflow failed." error before the job can even start.

The function that uses stateful processing:

class DetectChangeFn(DoFn):
    BAG_SPEC: BagStateSpec = BagStateSpec('changes', StrUtf8Coder())

    def __init__(self, field: str, *unused_args, **unused_kwargs):
        super().__init__(*unused_args, **unused_kwargs)
        self.field: str = field

    def process(self, element: Tuple[str, Dict], bag: BagRuntimeState = DoFn.StateParam(BAG_SPEC)) -> Iterable[Dict]:
        prev: List[str] = list(bag.read())
        current: str = str(element[1][self.field])

        if len(prev) == 0:
            bag.add(current)
        elif current != prev[0]:
            bag.clear()
            bag.add(current)
            yield element[1]

The error log message I get is very opaque and does not help all that much:

{
 insertId: "ivmrqmc2os"  
 labels: {
  dataflow.googleapis.com/job_id: "2021-06-16_07_32_29-2286715838537053772"   
  dataflow.googleapis.com/job_name: "xxx"   
  dataflow.googleapis.com/log_type: "system"   
  dataflow.googleapis.com/region: "europe-west1"   
 }
 logName: "projects/xxx/logs/dataflow.googleapis.com%2Fjob-message"  
 receiveTimestamp: "2021-06-16T14:32:44.452175552Z"  
 resource: {
  labels: {
   job_id: "2021-06-16_07_32_29-2286715838537053772"    
   job_name: "xxx"    
   project_id: "xxx"    
   region: "europe-west1"    
   step_id: ""    
  }
  type: "dataflow_step"   
 }
 severity: "ERROR"  
 textPayload: "Workflow failed."  
 timestamp: "2021-06-16T14:32:43.507731791Z"  
}

I'm using Apache Beam 2.30.0 with Python 3.8. Is stateful processing supported in Google Cloud Dataflow or am I missing something?

(EDIT) Added template build and deploy commands:

python -m main \
    --runner DataflowRunner \
    --streaming \
    --save_main_session \
    --setup_file ./setup.py \
    --project $PROJECT \
    --staging_location $STAGING_LOCATION \
    --temp_location $TEMP_LOCATION \
    --template_location "$TEMPLATE"
gcloud dataflow jobs run "my-dataflow-job" \
    --enable-streaming-engine \
    --disable-public-ips \
    --gcs-location "$TEMPLATE" \
    --subnetwork $SUBNET \
    --num-workers $NUM_WORKERS \
    --max-workers $MAX_WORKERS \
    --region $REGION \
    --service-account-email $SERVICE_ACCOUNT

I'm not specifying any experiments explicitly, these are used by default in the Dataflow job: ['use_fastavro', 'runner_harness_container_image=gcr.io/cloud-dataflow/v1beta3/harness:2.30.0', 'use_multiple_sdk_containers']

(EDIT 2) Also, I'm getting the exact same error and behaviour when using the new WriteToBigQuery(with_auto_sharding=True) parameter.

Upvotes: 1

Views: 594

Answers (1)

Alexander
Alexander

Reputation: 374

I've solved the issue by preventing the Dataflow service from performing fusion optimizations on the stateful DoFn.

To do so, I've added a Reshuffle() step right before the stateful DoFn.

This is one of the methods to prevent fusion optimizations described in the Dataflow docs: https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion

I've discovered this by looking at the logs right before the "Workflow failed." error occurred. There was a "Fusing consumer DoFnName into Stateful DoFn/KV DoFn" (edited for simplicity) that looked like it might be the cause of the error.

Upvotes: 2

Related Questions