hamdog
hamdog

Reputation: 1181

Google Cloud Dataflow fails in combine function due to worker losing contact

My Dataflow consistently fails in my combine function with no errors reported in the logs beyond a single entry of:

 A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service.

I am using the Apache Beam Python SDK 2.4.0. I have tried performing this step with both CombinePerKey and CombineGlobally. The pipeline failed in the combine function in both cases. The pipeline completes when running with a smaller amount of data.

Am I exhausting worker resources and not being told about it? What can cause a worker to lose contact with the service?

Update:

Using n1-highmem-4 workers gives me the same failure. When I check Stackdriver I see no errors, but three kinds of warnings: No session file found, Refusing to split, and Processing lull. My input collection size says it's 17,000 elements spread across ~60 MB, but Stackdriver has a statement saying I'm using ~25 GB on a single worker which is getting towards the max. For this input, each accumulator created in my CombineFn should take roughly 150 MB memory. Is my pipeline creating too many accumulators and exhausting its memory? If so, how can I tell it to merge accumulators more often or limit the number created?

I do have an error log entry verifying my worker was killed due to OOM. It just isn't tagged as a worker error which is the default filtering for the Dataflow monitor.

The pipeline definition looks something like:

table1 = (p | "Read Table1" >> beam.io.Read(beam.io.BigQuerySource(query=query))
     | "Key rows" >> beam.Map(lambda row: (row['key'], row)))
table2 = (p | "Read Table2" >> beam.io.Read(beam.io.BigQuerySource(query=query))
     | "Key rows" >> beam.Map(lambda row: (row['key'], row)))

merged = ({"table1": table1, "table2": table2}
     | "Join" >> beam.CoGroupByKey()
     | "Reshape" >> beam.ParDo(ReshapeData())
     | "Key rows" >> beam.Map(lambda row: (row['key'], row)))
     | "Build matrix" >> beam.CombinePerKey(MatrixCombiner())  # Dies here
     | "Write matrix" >> beam.io.avroio.WriteToAvro())

Upvotes: 0

Views: 943

Answers (1)

Amruth Bahadursha
Amruth Bahadursha

Reputation: 74

Running with fewer workers leads to less accumulators and successful completion of the pipeline.

Upvotes: 0

Related Questions