ash
ash

Reputation: 21

Dataflow Pickling Error When Submitting Jobs

I have this weird issue while submitting apache-beam jobs to GCP DataFlow. Sometimes the code runs fine and the execution is successful. However, occasionally I get the following pickling related error while trying to submit the same job (with no code changes). The final error is a 'KeyError: 'ClassType' message. Any clues as to why this happens occasionally?

DataflowRuntimeException                  Traceback (most recent call last)
<ipython-input-8-90e709657cb0> in <module>
----> 1 preprocess_reg('DataflowRunner')

<ipython-input-7-3200c06db079> in preprocess_reg(RUNNER)
     45                         | 'ExtractOnlyWords' >> beam.Map(lambda contents: contents[1])
     46                         | 'FlattenSentenceList' >> beam.FlatMap(lambda contents: contents)
---> 47                         | 'WriteCompleteFiles' >> beam.io.WriteToText(output_prefix_com,append_trailing_newlines=True))
     48 

/usr/local/lib/python3.5/dist-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
    425   def __exit__(self, exc_type, exc_val, exc_tb):
    426     if not exc_type:
--> 427       self.run().wait_until_finish()
    428 
    429   def visit(self, visitor):

/usr/local/lib/python3.5/dist-packages/apache_beam/runners/dataflow/dataflow_runner.py in wait_until_finish(self, duration)
   1345         raise DataflowRuntimeException(
   1346             'Dataflow pipeline failed. State: %s, Error:\n%s' %
-> 1347             (self.state, getattr(self._runner, 'last_error_msg', None)), self)
   1348     return self.state
   1349 

DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", line 261, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, in loads
    return load(file, ignore)
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, in load
    obj = pik.load()
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in _load_type
    return _reverse_typemap[name]
KeyError: 'ClassType'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py", line 176, in execute
    op.start()
  File "apache_beam/runners/worker/operations.py", line 587, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 588, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 589, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.Operation.start
  File "apache_beam/runners/worker/operations.py", line 224, in apache_beam.runners.worker.operations.Operation.start
  File "apache_beam/runners/worker/operations.py", line 535, in apache_beam.runners.worker.operations.DoOperation.setup
  File "apache_beam/runners/worker/operations.py", line 540, in apache_beam.runners.worker.operations.DoOperation.setup
  File "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", line 265, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, in loads
    return load(file, ignore)
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, in load
    obj = pik.load()
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in _load_type
    return _reverse_typemap[name]
**KeyError: 'ClassType'**

Below is a sample of the notebook code:

ParDo Function Cell

class WordExtractingDoFn_Final(beam.DoFn):    

def __init__(self,min_chg_year=2009):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    # super(WordExtractingDoFn, self).__init__()
    beam.DoFn.__init__(self)
    self.min_chg_year = min_chg_year


def process(self, element):
    """Returns parsed text after parsing by beautiful soup
    Args:
      element: the element being processed
    Returns:
      The processed element.
    """
    from bs4 import BeautifulSoup #import stmts in the function to facilitate worker access
    import re       
    
    re_headings = re.compile(r'.*(staff|participants.|committee)+.*(economic|financial|policy)+.*\s*.*',re.IGNORECASE) #regex for clear topics
    re_start_para = re.compile(r'By unanimous vot.* [\w\n\s]*[.]',re.IGNORECASE) #regex for indication of start of actual meat of minutes
    re_end_para = re.compile(r'Vot.* for this \w+',re.IGNORECASE) #regex for indicating where the minutes stop
    re_year = re.compile(r'[\d]{4}') # regex for extracting year from file name in element(1)
    statement = BeautifulSoup(element[1], 'html.parser')       
    headings = statement.findAll('strong') 
    words = ''
    
    #file year. topics only work for minutes after 2008.
    tgt_year = int(re.search(re_year,element[0]).group(0)) #extract year from file name
            
    #case where there are clear topics within the meeting minutes. this happened from 2011.
    if tgt_year >= self.min_chg_year and len(headings) >=2:
        for heading in headings:
            result = re.search(re_headings,heading.text)
            if result!= None:
                start = statement.text.find(heading.text) + len(heading.text)
                target = heading.findNext('strong')
                if target!=None:
                    end = statement.text.find(target.text)
                else:
                    end = len(statement.text)
                content = statement.text[start:end]                 
                words = words + content  
        
    
    #case where there are no topic within minutes. Have to determine where to start and stop
    else:
        results_start = re.finditer(re_start_para,statement.text)
        results_end = re.finditer(re_end_para,statement.text)
        
        try:
            first,*_ = results_end #unpacking of iteration. we need the first occurence of 'Voted for this**'
            start_pos = 0 #set the initial starting position to be zero
            end_pos = first.start()
            #check for the first start position that is before the end position
            for x in results_start:
                if x.end() >= end_pos: 
                    break
                else:
                    start_pos = x.end()
            if (start_pos!=0):                
                content = statement.text[start_pos:end_pos] #use the spans from first and last to determine text to extract
                words = content
            else:
                print ('file had a 0 start {}'.format(element[0])) 
        except:
            print ('file had issues {}'.format(element[0]))
            pass
        
    #split the paragraphs into sentences
    
    if len(words) > 0:
        re_us = re.compile(r'(U\.S\.).') #U.S. gives issues while trying to split. replace with USA 
        filtered_str = re.sub(re_us,'USA ',words)
        
        re_blank = re.compile(r'(\\\\r\\\\n|(\s){2,})') #filter out unwanted spaces, tabs. especially from older html files
        filtered_str = re.sub(re_blank,' ',filtered_str)
        
        
        re_encode_issues = re.compile(r'(\\x.{2})') #encoding issues with utf
        filtered_str = re.sub(re_encode_issues,'',filtered_str)
        
        re_apos_issues = re.compile(r"(\\')") #encoding issues with utf
        filtered_str = re.sub(re_apos_issues,'',filtered_str)
        
        #next, split the file
        words = re.split(r'[\.?:][:"\\\\n\s]+',filtered_str)
        
        
    return [(element[0],words)] 

DataFlow Execution Cell

def preprocess_reg(RUNNER):
    job_name = '********' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
    print('Launching Dataflow job {} ... hang on'.format(job_name))
    OUTPUT_DIR = 'gs://{0}/d***/processed'.format(BUCKET) 
    INPUT_FILES = 'gs://{0}/raw/minutes/**/*'.format(BUCKET) 
    output_prefix_com = 'gs://{0}/******/dataflowprocessing-trial'.format(BUCKET)

    #dictionary of pipeline options
    options = {
    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'job_name': 'prep******' + '-' + 
     datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
    'project': PROJECT,
    'runner': RUNNER,
    'num_workers' : 2,
    'max_num_workers': 3        
    }
    #instantiate PipelineOptions obj_ect using options dictionary
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    #instantantiate Pipeline object using PipelineOptions

    with beam.Pipeline(options=opts) as p:
        readable_files = (p 
                    | fileio.MatchFiles(INPUT_FILES)
                    | fileio.ReadMatches()
                    | beam.Reshuffle())

        files_and_contents = (readable_files 
                        | 'ReadFilesOneByOne' >> beam.Map(lambda x: (x.metadata.path, 
                                              x.read())))

        extract_process = (files_and_contents                       
                    | 'ProcesswithBS' >> beam.ParDo(WordExtractingDoFn_Final()))

    #use the filter function to write only files where the parsing WordExtractingDoFn was successful
        write_process = (extract_process
                    | 'FilterforFilesWithContent' >> beam.Filter(lambda contents: len(contents[1]) > 0) 
                    | 'ExtractOnlyWords' >> beam.Map(lambda contents: contents[1])
                    | 'FlattenSentenceList' >> beam.FlatMap(lambda contents: contents)
                    | 'WriteCompleteFiles' >> beam.io.WriteToText(output_prefix_com,append_trailing_newlines=True))

Upvotes: 2

Views: 1460

Answers (1)

Jayadeep Jayaraman
Jayadeep Jayaraman

Reputation: 2825

I believe this is related to https://issues.apache.org/jira/browse/BEAM-8651 where the pickler.py [1] has issues with dill. You can upgrade your beam version to 2.17 version and see if this resolves the issue.

1 - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/internal/pickler.py

Upvotes: 1

Related Questions