Funsaized
Funsaized

Reputation: 2130

Luigi task returns unfulfilled dependency at run time when dependency is complete

I am relatively new to creating flows with Luigi and am trying to understand why my small workflow is resulting in an unfulfilled dependency. I am trying to run the task StageProviders(), which has a single dependency ErrorsLogFile(). The tasks that must be run before StageProviders are simply tasks to create blank files on a shared drive. I receive following message when I try and run the StageProviders task in the following flow as follows:

Code:

#!/usr/local/bin/python

import luigi
import os
import shutil
import time
import pandas as pd
import time


class DupsExistingLogFile(luigi.Task):
    filename = luigi.Parameter()

    def requires(self):
        return None

    def output(self):
        timestr = time.strftime("%Y-%m-%d")
        return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_DuplicatesExisting.xlsx"))

def run(self):
    timestr = time.strftime("%Y-%m-%d")
    src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
    dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_DuplicatesExisting.xlsx"))
    shutil.copyfile(src_blank_file_str, dest_file_str)


class DupsLogFile(luigi.Task):
    filename = luigi.Parameter()

    def requires(self):
        return DupsExistingLogFile(self.filename)

    def output(self):
        timestr = time.strftime("%Y-%m-%d")
        return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Duplicates.xlsx"))

    def run(self):
        timestr = time.strftime("%Y-%m-%d")
        src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
        dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Duplicates.xlsx"))
        shutil.copyfile(src_blank_file_str, dest_file_str)


class ErrorsLogFile(luigi.Task):
    filename = luigi.Parameter()

    def requires(self):
        return DupsLogFile(self.filename)

    def output(self):
        timestr = time.strftime("%Y-%m-%d")
        return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Errprs.xlsx"))

    def run(self):
        timestr = time.strftime("%Y-%m-%d")
        src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
        dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Errors.xlsx"))
        shutil.copyfile(src_blank_file_str, dest_file_str)


class StageProviders(luigi.Task):
    filename = luigi.Parameter()

    def requires(self):
    return ErrorsLogFile(self.filename)

    def output(self):
        timestr = time.strftime("%Y-%m-%d")
        return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS_STG_' + os.path.splitext(self.filename)[0] + '_' + timestr + '.txt'))

def run(self):
    timestr = time.strftime("%Y-%m-%d")
    filepath_str = '/root/etc/mnt/Import/' + self.filename
    xls_file = pd.ExcelFile(filepath_str)
    df = xls_file.parse('Sheet1')
    src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS.txt')
    dest_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS_STG_' + os.path.splitext(self.filename)[0] + '_' + timestr + '.txt')
    if not df.empty:
        shutil.copyfile(src_blank_file_str, dest_file_str)
        with self.output().open('w') as out_file:
            for name in df['NP']:
                print(name, end='\n', file=out_file)

Output:

root@ubuntu:~/pythonfiles/luigi_POC/cpi_luigi_poc/src# python3 -m luigi --module provider_import  StageProviders --filename CCM_provider_sample.xlsx --
local-scheduler
DEBUG: Checking if StageProviders(filename=CCM_provider_sample.xlsx) is complete
DEBUG: Checking if ErrorsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task   StageProviders_CCM_provider_sam_ad65b206fd   has status   PENDING
DEBUG: Checking if DupsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task   ErrorsLogFile_CCM_provider_sam_ad65b206fd   has status   PENDING
DEBUG: Checking if DupsExistingLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task   DupsLogFile_CCM_provider_sam_ad65b206fd   has status   PENDING
INFO: Informed scheduler that task   DupsExistingLogFile_CCM_provider_sam_ad65b206fd   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running   DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done      DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DupsExistingLogFile_CCM_provider_sam_ad65b206fd   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running   DupsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done      DupsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DupsLogFile_CCM_provider_sam_ad65b206fd   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running   ErrorsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done      ErrorsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ErrorsLogFile_CCM_provider_sam_ad65b206fd   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running   StageProviders(filename=CCM_provider_sample.xlsx)
ERROR: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) failed    StageProviders(filename=CCM_provider_sample.xlsx)
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/luigi/worker.py", line 175, in run
    raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   StageProviders_CCM_provider_sam_ad65b206fd   has status   FAILED
DEBUG: Checking if StageProviders(filename=CCM_provider_sample.xlsx) is complete
DEBUG: Checking if ErrorsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task   StageProviders_CCM_provider_sam_ad65b206fd   has status   PENDING
DEBUG: Checking if DupsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task   ErrorsLogFile_CCM_provider_sam_ad65b206fd   has status   PENDING
INFO: Informed scheduler that task   DupsLogFile_CCM_provider_sam_ad65b206fd   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running   ErrorsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done      ErrorsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ErrorsLogFile_CCM_provider_sam_ad65b206fd   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running   StageProviders(filename=CCM_provider_sample.xlsx)
ERROR: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) failed    StageProviders(filename=CCM_provider_sample.xlsx)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/luigi/worker.py", line 175, in run
    raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   StageProviders_CCM_provider_sam_ad65b206fd   has status   FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 3 ran successfully:
    - 1 DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
    - 1 DupsLogFile(filename=CCM_provider_sample.xlsx)
    - 1 ErrorsLogFile(filename=CCM_provider_sample.xlsx)
* 1 failed:
- 1 StageProviders(filename=CCM_provider_sample.xlsx)

This progress looks :( because there were failed tasks

It appears this is because of this message:

RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd

However, reading the output it seems ErrorsLogFile_CCM_provider_sam_ad65b206fd has finished before StageProviders is run?... Why is the scheduler returning unfulfilled dependency? I believe I'm misunderstanding how to "chain" tasks together. I simply want the StageProviders task to run following successful completion of the ErrorsLogFile, DupsLogFile, and DupsExistingLogFile tasks.

Upvotes: 4

Views: 2957

Answers (1)

David
David

Reputation: 106

There is a typo in the output of your ErrorLogs (which is unfortunately not copied over to your run method)

[...] + timestr + "_Errprs.xlsx"))

Therefore, the task runs fine and gets status DONE but when StageProviders checks for its requirements, it calls the complete method of ErrorLogs which returns false because this file does not exist and hence this "Unfulfilled dependency at run time" error.

This error generally means that the status of the task changes during the execution of the workflow.

Upvotes: 1

Related Questions