Dr. Fabien Tarrade
Dr. Fabien Tarrade

Reputation: 1706

Dataflow job failed after more than 6 hours with "The worker lost contact with the service"?

I am using Dataflow to read data from BigQuery and then do NLP preprocessing using python. I am using Python 3 and SDK 2.16.0. I am using 100 workers (provite IP, private access and Cloud NAT) with workers in europe-west6 and endpoint in europe-west1. The BigQuery tables are in US. Test jobs were working without any issue but when trying to process the full table (32 GB), the job failed after 6h 40 min and it is hard to fully understand what is the underlying error.

First the following is reported by Dataflow: enter image description here It is a bit confusing: in one case work item failed, 2 other workers lost contact with the service and one workers has been reported dead!

Now let see the logs of reading BigQuery data: enter image description here The first thing that is suspicious is this message "Refreshing due to a 401 (attempt 1/2)" that appear every 3 second during the full dataflow job. I don't think this is related to the crash but this is strange. The timestamp of the issue with BigQuery (16:28:07 and 16:28:15) appear after the issue reported with the workers (16:27:44).

An exception was raised when trying to execute the workitem 7962803802081012962 : Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 176, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 48, in dataflow_worker.native_operations.NativeReadOperation.start
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/nativefileio.py", line 204, in __iter__
    for record in self.read_next_block():
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/nativeavroio.py", line 198, in read_next_block
    fastavro_block = next(self._block_iterator)
  File "fastavro/_read.pyx", line 738, in fastavro._read.file_reader.next
  File "fastavro/_read.pyx", line 662, in _iter_avro_blocks
  File "fastavro/_read.pyx", line 595, in fastavro._read.null_read_block
  File "fastavro/_read.pyx", line 597, in fastavro._read.null_read_block
  File "fastavro/_read.pyx", line 304, in fastavro._read.read_bytes
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 113, in readinto
    data = self._downloader.get_range(start, end)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 522, in get_range
    self._downloader.GetRange(start, end - 1)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 486, in GetRange
    response = self.__ProcessResponse(response)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 424, in __ProcessResponse
    raise exceptions.HttpError.FromResponse(response)
apitools.base.py.exceptions.HttpNotFoundError: HttpError accessing <https://www.googleapis.com/storage/v1/b/xxx/o/beam%2Ftemp%2Fstackoverflow-raphael-191119-084402.1574153042.687677%2F11710707918635668555%2F000000000009.avro?alt=media&generation=1574154204169350>: response: <{'x-guploader-uploadid': 'AEnB2UpgIuanY0AawrT7fRC_VW3aRfWSdrrTwT_TqQx1fPAAAUohVoL-8Z8Zw_aYUQcSMNqKIh5R2TulvgHHsoxLWo2gl6wUEA', 'content-type': 'text/html; charset=UTF-8', 'date': 'Tue, 19 Nov 2019 15:28:07 GMT', 'vary': 'Origin, X-Origin', 'expires': 'Tue, 19 Nov 2019 15:28:07 GMT', 'cache-control': 'private, max-age=0', 'content-length': '142', 'server': 'UploadServer', 'status': '404'}>, content <No such object: nlp-text-classification/beam/temp/stackoverflow-xxxx-191119-084402.1574153042.687677/11710707918635668555/000000000009.avro>

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 176, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 48, in dataflow_worker.native_operations.NativeReadOperation.start
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/nativefileio.py", line 204, in __iter__
    for record in self.read_next_block():
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/nativeavroio.py", line 198, in read_next_block
    fastavro_block = next(self._block_iterator)
  File "fastavro/_read.pyx", line 738, in fastavro._read.file_reader.next
  File "fastavro/_read.pyx", line 662, in _iter_avro_blocks
  File "fastavro/_read.pyx", line 595, in fastavro._read.null_read_block
  File "fastavro/_read.pyx", line 597, in fastavro._read.null_read_block
  File "fastavro/_read.pyx", line 304, in fastavro._read.read_bytes
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 113, in readinto
    data = self._downloader.get_range(start, end)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 522, in get_range
    self._downloader.GetRange(start, end - 1)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 486, in GetRange
    response = self.__ProcessResponse(response)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 424, in __ProcessResponse
    raise exceptions.HttpError.FromResponse(response)
apitools.base.py.exceptions.HttpNotFoundError: HttpError accessing <https://www.googleapis.com/storage/v1/b/xxxx/o/beam%2Ftemp%2Fstackoverflow-raphael-191119-084402.1574153042.687677%2F11710707918635668555%2F000000000009.avro?alt=media&generation=1574154204169350>: response: <{'x-guploader-uploadid': 'AEnB2UpgIuanY0AawrT7fRC_VW3aRfWSdrrTwT_TqQx1fPAAAUohVoL-8Z8Zw_aYUQcSMNqKIh5R2TulvgHHsoxLWo2gl6wUEA', 'content-type': 'text/html; charset=UTF-8', 'date': 'Tue, 19 Nov 2019 15:28:07 GMT', 'vary': 'Origin, X-Origin', 'expires': 'Tue, 19 Nov 2019 15:28:07 GMT', 'cache-control': 'private, max-age=0', 'content-length': '142', 'server': 'UploadServer', 'status': '404'}>, content <No such object: nlp-text-classification/beam/temp/stackoverflow-xxxx-191119-084402.1574153042.687677/11710707918635668555/000000000009.avro>
timestamp   
2019-11-19T15:28:07.770312309Z
logger  
root:batchworker.py:do_work
severity    
ERROR
worker  
stackoverflow-xxxx-191-11190044-7wyy-harness-2k89
step    
Read Posts from BigQuery
thread  
73:140029564072960

It seems the workers cannot find some avro files on Cloud Storage. This could be related to the message "The workers lost contact with the service"

If I look at "ERROR" I see a lot of them so it seems that the workers itsefl were having issues: enter image description here

Looking at Stack Traces doesn't give more more hints.

My questions are the following:

  1. how we can be sure that the issue is related to the workers ?
  2. what can be the reason ? memory ? disk ? or transient issue ?
  3. is there an option to recover in case workers are dead ? why the full job is stopping is 3/98 workers are dead or get lost ? Is there a parameter for that ?

Our setup:

enter image description here

We were monitoring with Stackdriver some quantities but to me nothing look wrong: enter image description here enter image description here

Upvotes: 0

Views: 2247

Answers (2)

Dr. Fabien Tarrade
Dr. Fabien Tarrade

Reputation: 1706

After some test and after few plots for monitoring, it was clear that even if the length of the text was the sime the processing time started to augment rapidly (botton right plot) enter image description here

Then it became clear that the issue was with SpaCy 2.1.8 (memory leak).

enter image description here

Using Spacy 2.2.3 fix the issue. Now the 32 Gb of data are processed in 4h30 without any issue. enter image description here

Upvotes: 1

datancoffee
datancoffee

Reputation: 211

The default for Batch jobs not using Dataflow Shuffle is 250GB, so your setting of 50GB is leaving very little space for any shuffle data that needs to be stored on the worker.

It would be good to see the shape of your pipeline (what are the steps involved), but based on the log screenshots, you have 4 steps (read from BQ, preprocess, write to BQ, also write to GCS). I also see some GroupBy operations. The GroupBy operations will requite shuffling, and your 50GB disks might be limiting storage.

You should try a couple of things: - don't limit the Workers to 50GB (remove the diskGB setting so that Dataflow can use defaults) - try Dataflow Shuffle (--experiments=shuffle_mode=service) see https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-shuffle

When you use Dataflow Shuffle, the diskGB parameter has a 30GB default. You can use small disks then (I would still recommend not setting diskGBSize yourself)

Upvotes: 1

Related Questions