Reputation: 21
I have this weird issue while submitting apache-beam jobs to GCP DataFlow. Sometimes the code runs fine and the execution is successful. However, occasionally I get the following pickling related error while trying to submit the same job (with no code changes). The final error is a 'KeyError: 'ClassType'
message. Any clues as to why this happens occasionally?
DataflowRuntimeException Traceback (most recent call last)
<ipython-input-8-90e709657cb0> in <module>
----> 1 preprocess_reg('DataflowRunner')
<ipython-input-7-3200c06db079> in preprocess_reg(RUNNER)
45 | 'ExtractOnlyWords' >> beam.Map(lambda contents: contents[1])
46 | 'FlattenSentenceList' >> beam.FlatMap(lambda contents: contents)
---> 47 | 'WriteCompleteFiles' >> beam.io.WriteToText(output_prefix_com,append_trailing_newlines=True))
48
/usr/local/lib/python3.5/dist-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
425 def __exit__(self, exc_type, exc_val, exc_tb):
426 if not exc_type:
--> 427 self.run().wait_until_finish()
428
429 def visit(self, visitor):
/usr/local/lib/python3.5/dist-packages/apache_beam/runners/dataflow/dataflow_runner.py in wait_until_finish(self, duration)
1345 raise DataflowRuntimeException(
1346 'Dataflow pipeline failed. State: %s, Error:\n%s' %
-> 1347 (self.state, getattr(self._runner, 'last_error_msg', None)), self)
1348 return self.state
1349
DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", line 261, in loads
return dill.loads(s)
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, in loads
return load(file, ignore)
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, in load
obj = pik.load()
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in _load_type
return _reverse_typemap[name]
KeyError: 'ClassType'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py", line 176, in execute
op.start()
File "apache_beam/runners/worker/operations.py", line 587, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 588, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 589, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.Operation.start
File "apache_beam/runners/worker/operations.py", line 224, in apache_beam.runners.worker.operations.Operation.start
File "apache_beam/runners/worker/operations.py", line 535, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/worker/operations.py", line 540, in apache_beam.runners.worker.operations.DoOperation.setup
File "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", line 265, in loads
return dill.loads(s)
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, in loads
return load(file, ignore)
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, in load
obj = pik.load()
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in _load_type
return _reverse_typemap[name]
**KeyError: 'ClassType'**
Below is a sample of the notebook code:
class WordExtractingDoFn_Final(beam.DoFn):
def __init__(self,min_chg_year=2009):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super(WordExtractingDoFn, self).__init__()
beam.DoFn.__init__(self)
self.min_chg_year = min_chg_year
def process(self, element):
"""Returns parsed text after parsing by beautiful soup
Args:
element: the element being processed
Returns:
The processed element.
"""
from bs4 import BeautifulSoup #import stmts in the function to facilitate worker access
import re
re_headings = re.compile(r'.*(staff|participants.|committee)+.*(economic|financial|policy)+.*\s*.*',re.IGNORECASE) #regex for clear topics
re_start_para = re.compile(r'By unanimous vot.* [\w\n\s]*[.]',re.IGNORECASE) #regex for indication of start of actual meat of minutes
re_end_para = re.compile(r'Vot.* for this \w+',re.IGNORECASE) #regex for indicating where the minutes stop
re_year = re.compile(r'[\d]{4}') # regex for extracting year from file name in element(1)
statement = BeautifulSoup(element[1], 'html.parser')
headings = statement.findAll('strong')
words = ''
#file year. topics only work for minutes after 2008.
tgt_year = int(re.search(re_year,element[0]).group(0)) #extract year from file name
#case where there are clear topics within the meeting minutes. this happened from 2011.
if tgt_year >= self.min_chg_year and len(headings) >=2:
for heading in headings:
result = re.search(re_headings,heading.text)
if result!= None:
start = statement.text.find(heading.text) + len(heading.text)
target = heading.findNext('strong')
if target!=None:
end = statement.text.find(target.text)
else:
end = len(statement.text)
content = statement.text[start:end]
words = words + content
#case where there are no topic within minutes. Have to determine where to start and stop
else:
results_start = re.finditer(re_start_para,statement.text)
results_end = re.finditer(re_end_para,statement.text)
try:
first,*_ = results_end #unpacking of iteration. we need the first occurence of 'Voted for this**'
start_pos = 0 #set the initial starting position to be zero
end_pos = first.start()
#check for the first start position that is before the end position
for x in results_start:
if x.end() >= end_pos:
break
else:
start_pos = x.end()
if (start_pos!=0):
content = statement.text[start_pos:end_pos] #use the spans from first and last to determine text to extract
words = content
else:
print ('file had a 0 start {}'.format(element[0]))
except:
print ('file had issues {}'.format(element[0]))
pass
#split the paragraphs into sentences
if len(words) > 0:
re_us = re.compile(r'(U\.S\.).') #U.S. gives issues while trying to split. replace with USA
filtered_str = re.sub(re_us,'USA ',words)
re_blank = re.compile(r'(\\\\r\\\\n|(\s){2,})') #filter out unwanted spaces, tabs. especially from older html files
filtered_str = re.sub(re_blank,' ',filtered_str)
re_encode_issues = re.compile(r'(\\x.{2})') #encoding issues with utf
filtered_str = re.sub(re_encode_issues,'',filtered_str)
re_apos_issues = re.compile(r"(\\')") #encoding issues with utf
filtered_str = re.sub(re_apos_issues,'',filtered_str)
#next, split the file
words = re.split(r'[\.?:][:"\\\\n\s]+',filtered_str)
return [(element[0],words)]
def preprocess_reg(RUNNER):
job_name = '********' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
print('Launching Dataflow job {} ... hang on'.format(job_name))
OUTPUT_DIR = 'gs://{0}/d***/processed'.format(BUCKET)
INPUT_FILES = 'gs://{0}/raw/minutes/**/*'.format(BUCKET)
output_prefix_com = 'gs://{0}/******/dataflowprocessing-trial'.format(BUCKET)
#dictionary of pipeline options
options = {
'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
'job_name': 'prep******' + '-' +
datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
'project': PROJECT,
'runner': RUNNER,
'num_workers' : 2,
'max_num_workers': 3
}
#instantiate PipelineOptions obj_ect using options dictionary
opts = beam.pipeline.PipelineOptions(flags=[], **options)
#instantantiate Pipeline object using PipelineOptions
with beam.Pipeline(options=opts) as p:
readable_files = (p
| fileio.MatchFiles(INPUT_FILES)
| fileio.ReadMatches()
| beam.Reshuffle())
files_and_contents = (readable_files
| 'ReadFilesOneByOne' >> beam.Map(lambda x: (x.metadata.path,
x.read())))
extract_process = (files_and_contents
| 'ProcesswithBS' >> beam.ParDo(WordExtractingDoFn_Final()))
#use the filter function to write only files where the parsing WordExtractingDoFn was successful
write_process = (extract_process
| 'FilterforFilesWithContent' >> beam.Filter(lambda contents: len(contents[1]) > 0)
| 'ExtractOnlyWords' >> beam.Map(lambda contents: contents[1])
| 'FlattenSentenceList' >> beam.FlatMap(lambda contents: contents)
| 'WriteCompleteFiles' >> beam.io.WriteToText(output_prefix_com,append_trailing_newlines=True))
Upvotes: 2
Views: 1460
Reputation: 2825
I believe this is related to https://issues.apache.org/jira/browse/BEAM-8651 where the pickler.py [1] has issues with dill. You can upgrade your beam version to 2.17 version and see if this resolves the issue.
1 - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/internal/pickler.py
Upvotes: 1