Reputation: 374
I'm using stateful processing in a streaming python Beam pipeline in order to detect when a field of a JSON message changes. When running using DirectRunner it works fine, but using DataflowRunner I get a "Workflow failed." error before the job can even start.
The function that uses stateful processing:
class DetectChangeFn(DoFn):
BAG_SPEC: BagStateSpec = BagStateSpec('changes', StrUtf8Coder())
def __init__(self, field: str, *unused_args, **unused_kwargs):
super().__init__(*unused_args, **unused_kwargs)
self.field: str = field
def process(self, element: Tuple[str, Dict], bag: BagRuntimeState = DoFn.StateParam(BAG_SPEC)) -> Iterable[Dict]:
prev: List[str] = list(bag.read())
current: str = str(element[1][self.field])
if len(prev) == 0:
bag.add(current)
elif current != prev[0]:
bag.clear()
bag.add(current)
yield element[1]
The error log message I get is very opaque and does not help all that much:
{
insertId: "ivmrqmc2os"
labels: {
dataflow.googleapis.com/job_id: "2021-06-16_07_32_29-2286715838537053772"
dataflow.googleapis.com/job_name: "xxx"
dataflow.googleapis.com/log_type: "system"
dataflow.googleapis.com/region: "europe-west1"
}
logName: "projects/xxx/logs/dataflow.googleapis.com%2Fjob-message"
receiveTimestamp: "2021-06-16T14:32:44.452175552Z"
resource: {
labels: {
job_id: "2021-06-16_07_32_29-2286715838537053772"
job_name: "xxx"
project_id: "xxx"
region: "europe-west1"
step_id: ""
}
type: "dataflow_step"
}
severity: "ERROR"
textPayload: "Workflow failed."
timestamp: "2021-06-16T14:32:43.507731791Z"
}
I'm using Apache Beam 2.30.0 with Python 3.8. Is stateful processing supported in Google Cloud Dataflow or am I missing something?
(EDIT) Added template build and deploy commands:
python -m main \
--runner DataflowRunner \
--streaming \
--save_main_session \
--setup_file ./setup.py \
--project $PROJECT \
--staging_location $STAGING_LOCATION \
--temp_location $TEMP_LOCATION \
--template_location "$TEMPLATE"
gcloud dataflow jobs run "my-dataflow-job" \
--enable-streaming-engine \
--disable-public-ips \
--gcs-location "$TEMPLATE" \
--subnetwork $SUBNET \
--num-workers $NUM_WORKERS \
--max-workers $MAX_WORKERS \
--region $REGION \
--service-account-email $SERVICE_ACCOUNT
I'm not specifying any experiments explicitly, these are used by default in the Dataflow job: ['use_fastavro', 'runner_harness_container_image=gcr.io/cloud-dataflow/v1beta3/harness:2.30.0', 'use_multiple_sdk_containers']
(EDIT 2) Also, I'm getting the exact same error and behaviour when using the new WriteToBigQuery(with_auto_sharding=True) parameter.
Upvotes: 1
Views: 594
Reputation: 374
I've solved the issue by preventing the Dataflow service from performing fusion optimizations on the stateful DoFn.
To do so, I've added a Reshuffle() step right before the stateful DoFn.
This is one of the methods to prevent fusion optimizations described in the Dataflow docs: https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion
I've discovered this by looking at the logs right before the "Workflow failed." error occurred. There was a "Fusing consumer DoFnName into Stateful DoFn/KV DoFn" (edited for simplicity) that looked like it might be the cause of the error.
Upvotes: 2